You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/16 15:02:23 UTC
[kafka] branch 2.7 updated: KAFKA-10600: Connect should not add error to connector validation values for properties not in connector’s ConfigDef (#9425)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 6b10979 KAFKA-10600: Connect should not add error to connector validation values for properties not in connector’s ConfigDef (#9425)
6b10979 is described below
commit 6b109795ed707792dc32bece7c51188960d969a5
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Fri Oct 16 09:14:43 2020 -0500
KAFKA-10600: Connect should not add error to connector validation values for properties not in connector’s ConfigDef (#9425)
Connect should not always add an error to configuration values in validation results that don't have a `ConfigKey` defined in the connector's `ConfigDef`, and any errors on such configuration values included by the connector should be counted in the total number of errors. Added more unit tests for `AbstractHerder.generateResult(...)`.
Author: Randall Hauch <rh...@gmail.com>
Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
.../kafka/connect/runtime/AbstractHerder.java | 2 +-
.../kafka/connect/runtime/AbstractHerderTest.java | 173 ++++++++++++++++++++-
2 files changed, 173 insertions(+), 2 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index d5cc3de..aa348f2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -502,8 +502,8 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
String configName = configValue.name();
configValueMap.put(configName, configValue);
if (!configKeys.containsKey(configName)) {
- configValue.addErrorMessage("Configuration is not defined: " + configName);
configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue, null)));
+ errorCount += configValue.errorMessages().size();
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 1fbd1bf..3943f9d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigTransformer;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
import org.apache.kafka.connect.connector.ConnectRecord;
@@ -34,6 +35,7 @@ import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
@@ -66,6 +68,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues;
+import static org.junit.Assert.assertNull;
import static org.powermock.api.easymock.PowerMock.verifyAll;
import static org.powermock.api.easymock.PowerMock.replayAll;
import static org.easymock.EasyMock.strictMock;
@@ -606,10 +609,178 @@ public class AbstractHerderTest {
testConfigProviderRegex("plain.PlainLoginModule required username", false);
}
+ @Test
+ public void testGenerateResultWithConfigValuesAllUsingConfigKeysAndWithNoErrors() {
+ String name = "com.acme.connector.MyConnector";
+ Map<String, ConfigDef.ConfigKey> keys = new HashMap<>();
+ addConfigKey(keys, "config.a1", null);
+ addConfigKey(keys, "config.b1", "group B");
+ addConfigKey(keys, "config.b2", "group B");
+ addConfigKey(keys, "config.c1", "group C");
+
+ List<String> groups = Arrays.asList("groupB", "group C");
+ List<ConfigValue> values = new ArrayList<>();
+ addValue(values, "config.a1", "value.a1");
+ addValue(values, "config.b1", "value.b1");
+ addValue(values, "config.b2", "value.b2");
+ addValue(values, "config.c1", "value.c1");
+
+ ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
+ assertEquals(name, infos.name());
+ assertEquals(groups, infos.groups());
+ assertEquals(values.size(), infos.values().size());
+ assertEquals(0, infos.errorCount());
+ assertInfoKey(infos, "config.a1", null);
+ assertInfoKey(infos, "config.b1", "group B");
+ assertInfoKey(infos, "config.b2", "group B");
+ assertInfoKey(infos, "config.c1", "group C");
+ assertInfoValue(infos, "config.a1", "value.a1");
+ assertInfoValue(infos, "config.b1", "value.b1");
+ assertInfoValue(infos, "config.b2", "value.b2");
+ assertInfoValue(infos, "config.c1", "value.c1");
+ }
+
+ @Test
+ public void testGenerateResultWithConfigValuesAllUsingConfigKeysAndWithSomeErrors() {
+ String name = "com.acme.connector.MyConnector";
+ Map<String, ConfigDef.ConfigKey> keys = new HashMap<>();
+ addConfigKey(keys, "config.a1", null);
+ addConfigKey(keys, "config.b1", "group B");
+ addConfigKey(keys, "config.b2", "group B");
+ addConfigKey(keys, "config.c1", "group C");
+
+ List<String> groups = Arrays.asList("groupB", "group C");
+ List<ConfigValue> values = new ArrayList<>();
+ addValue(values, "config.a1", "value.a1");
+ addValue(values, "config.b1", "value.b1");
+ addValue(values, "config.b2", "value.b2");
+ addValue(values, "config.c1", "value.c1", "error c1");
+
+ ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
+ assertEquals(name, infos.name());
+ assertEquals(groups, infos.groups());
+ assertEquals(values.size(), infos.values().size());
+ assertEquals(1, infos.errorCount());
+ assertInfoKey(infos, "config.a1", null);
+ assertInfoKey(infos, "config.b1", "group B");
+ assertInfoKey(infos, "config.b2", "group B");
+ assertInfoKey(infos, "config.c1", "group C");
+ assertInfoValue(infos, "config.a1", "value.a1");
+ assertInfoValue(infos, "config.b1", "value.b1");
+ assertInfoValue(infos, "config.b2", "value.b2");
+ assertInfoValue(infos, "config.c1", "value.c1", "error c1");
+ }
+
+ @Test
+ public void testGenerateResultWithConfigValuesMoreThanConfigKeysAndWithSomeErrors() {
+ String name = "com.acme.connector.MyConnector";
+ Map<String, ConfigDef.ConfigKey> keys = new HashMap<>();
+ addConfigKey(keys, "config.a1", null);
+ addConfigKey(keys, "config.b1", "group B");
+ addConfigKey(keys, "config.b2", "group B");
+ addConfigKey(keys, "config.c1", "group C");
+
+ List<String> groups = Arrays.asList("groupB", "group C");
+ List<ConfigValue> values = new ArrayList<>();
+ addValue(values, "config.a1", "value.a1");
+ addValue(values, "config.b1", "value.b1");
+ addValue(values, "config.b2", "value.b2");
+ addValue(values, "config.c1", "value.c1", "error c1");
+ addValue(values, "config.extra1", "value.extra1");
+ addValue(values, "config.extra2", "value.extra2", "error extra2");
+
+ ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
+ assertEquals(name, infos.name());
+ assertEquals(groups, infos.groups());
+ assertEquals(values.size(), infos.values().size());
+ assertEquals(2, infos.errorCount());
+ assertInfoKey(infos, "config.a1", null);
+ assertInfoKey(infos, "config.b1", "group B");
+ assertInfoKey(infos, "config.b2", "group B");
+ assertInfoKey(infos, "config.c1", "group C");
+ assertNoInfoKey(infos, "config.extra1");
+ assertNoInfoKey(infos, "config.extra2");
+ assertInfoValue(infos, "config.a1", "value.a1");
+ assertInfoValue(infos, "config.b1", "value.b1");
+ assertInfoValue(infos, "config.b2", "value.b2");
+ assertInfoValue(infos, "config.c1", "value.c1", "error c1");
+ assertInfoValue(infos, "config.extra1", "value.extra1");
+ assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2");
+ }
+
+ @Test
+ public void testGenerateResultWithConfigValuesWithNoConfigKeysAndWithSomeErrors() {
+ String name = "com.acme.connector.MyConnector";
+ Map<String, ConfigDef.ConfigKey> keys = new HashMap<>();
+
+ List<String> groups = new ArrayList<>();
+ List<ConfigValue> values = new ArrayList<>();
+ addValue(values, "config.a1", "value.a1");
+ addValue(values, "config.b1", "value.b1");
+ addValue(values, "config.b2", "value.b2");
+ addValue(values, "config.c1", "value.c1", "error c1");
+ addValue(values, "config.extra1", "value.extra1");
+ addValue(values, "config.extra2", "value.extra2", "error extra2");
+
+ ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
+ assertEquals(name, infos.name());
+ assertEquals(groups, infos.groups());
+ assertEquals(values.size(), infos.values().size());
+ assertEquals(2, infos.errorCount());
+ assertNoInfoKey(infos, "config.a1");
+ assertNoInfoKey(infos, "config.b1");
+ assertNoInfoKey(infos, "config.b2");
+ assertNoInfoKey(infos, "config.c1");
+ assertNoInfoKey(infos, "config.extra1");
+ assertNoInfoKey(infos, "config.extra2");
+ assertInfoValue(infos, "config.a1", "value.a1");
+ assertInfoValue(infos, "config.b1", "value.b1");
+ assertInfoValue(infos, "config.b2", "value.b2");
+ assertInfoValue(infos, "config.c1", "value.c1", "error c1");
+ assertInfoValue(infos, "config.extra1", "value.extra1");
+ assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2");
+ }
+
+ protected void addConfigKey(Map<String, ConfigDef.ConfigKey> keys, String name, String group) {
+ keys.put(name, new ConfigDef.ConfigKey(name, ConfigDef.Type.STRING, null, null,
+ ConfigDef.Importance.HIGH, "doc", group, 10,
+ ConfigDef.Width.MEDIUM, "display name", Collections.emptyList(), null, false));
+ }
+
+ protected void addValue(List<ConfigValue> values, String name, String value, String...errors) {
+ values.add(new ConfigValue(name, value, new ArrayList<>(), Arrays.asList(errors)));
+ }
+
+ protected void assertInfoKey(ConfigInfos infos, String name, String group) {
+ ConfigInfo info = findInfo(infos, name);
+ assertEquals(name, info.configKey().name());
+ assertEquals(group, info.configKey().group());
+ }
+
+ protected void assertNoInfoKey(ConfigInfos infos, String name) {
+ ConfigInfo info = findInfo(infos, name);
+ assertNull(info.configKey());
+ }
+
+ protected void assertInfoValue(ConfigInfos infos, String name, String value, String...errors) {
+ ConfigValueInfo info = findInfo(infos, name).configValue();
+ assertEquals(name, info.name());
+ assertEquals(value, info.value());
+ assertEquals(Arrays.asList(errors), info.errors());
+ }
+
+ protected ConfigInfo findInfo(ConfigInfos infos, String name) {
+ return infos.values()
+ .stream()
+ .filter(i -> i.configValue().name().equals(name))
+ .findFirst()
+ .orElse(null);
+ }
+
private void testConfigProviderRegex(String rawConnConfig) {
testConfigProviderRegex(rawConnConfig, true);
}
-
+
private void testConfigProviderRegex(String rawConnConfig, boolean expected) {
Set<String> keys = keysWithVariableValues(Collections.singletonMap("key", rawConnConfig), ConfigTransformer.DEFAULT_PATTERN);
boolean actual = keys != null && !keys.isEmpty() && keys.contains("key");