You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/30 22:11:26 UTC

[GitHub] [kafka] gharris1727 commented on a change in pull request #11369: KAFKA-13327, KAFKA-13328, KAFKA-13329: Clean up preflight connector validation

gharris1727 commented on a change in pull request #11369:
URL: https://github.com/apache/kafka/pull/11369#discussion_r719774645



##########
File path: clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
##########
@@ -1116,11 +1119,79 @@ public void ensureValid(String name, Object value) {
             }
         }
 
+        @Override
         public String toString() {
             return "non-empty string without ISO control characters";
         }
     }
 
+    public static class InstantiableClassValidator implements Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            if (value == null) {
+                // The value will be null if the class couldn't be found; no point in performing follow-up validation
+                return;
+            }
+
+            Class<?> cls = (Class<?>) value;
+            try {
+                cls.getDeclaredConstructor().newInstance();

Review comment:
       I hope these arbitrary classes that people put in here don't cause memory leaks from doing their own initialization.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -344,12 +354,73 @@ public StatusBackingStore statusBackingStore() {
                 status.workerId(), status.trace());
     }
 
-    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector,
-                                                                    ConfigDef configDef,
-                                                                    Map<String, String> config) {
+    protected Map<String, ConfigValue> validateSinkConnectorConfig(ConfigDef configDef, Map<String, String> config) {
+        return SinkConnectorConfig.validate(configDef.validateAll(config), config);
+    }
+
+    protected Map<String, ConfigValue> validateSourceConnectorConfig(ConfigDef configDef, Map<String, String> config) {
         return configDef.validateAll(config);
     }
 
+    private ConfigInfos validateHeaderConverterConfig(Map<String, String> connectorConfig, ConfigValue headerConverterConfigValue) {
+        String headerConverterClass = connectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG);
+
+        if (headerConverterClass == null
+            || headerConverterConfigValue == null
+            || !headerConverterConfigValue.errorMessages().isEmpty()
+        ) {
+            // Either no custom header converter was specified, or one was specified but there's a problem with it.
+            // No need to proceed any further.
+            return null;
+        }
+
+        HeaderConverter headerConverter;
+        try {
+            headerConverter = Utils.newInstance(headerConverterClass, HeaderConverter.class);
+        } catch (ClassNotFoundException | RuntimeException e) {
+            log.error("Failed to instantiate header converter class {}; this should have been caught by prior validation logic", headerConverterClass, e);
+            headerConverterConfigValue.addErrorMessage("Failed to load class " + headerConverterClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+            return null;
+        }
+
+        ConfigDef configDef;
+        try {
+            configDef = headerConverter.config();
+        } catch (RuntimeException e) {
+            log.error("Failed to load ConfigDef from header converter of type {}", headerConverterClass, e);
+            headerConverterConfigValue.addErrorMessage("Failed to load ConfigDef from header converter" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+            return null;
+        }
+        if (configDef == null) {
+            log.warn("{}.configDef() has returned a null ConfigDef; no further preflight config validation for this converter will be performed", headerConverterClass);

Review comment:
       ```suggestion
               log.warn("{}.config() has returned a null ConfigDef; no further preflight config validation for this converter will be performed", headerConverterClass);
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
##########
@@ -45,7 +45,7 @@
 
     private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class);
     public static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
-    public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+    public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(5);

Review comment:
       Does this reduce test runtime? Might this introduce flakiness in a CI environment?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+
+/**
+ * Integration test for preflight connector config validation
+ */
+@Category(IntegrationTest.class)
+public class ConnectorValidationIntegrationTest {
+
+    private static final String WORKER_GROUP_ID = "connect-worker-group-id";
+
+    // Use a single embedded cluster for all test cases in order to cut down on runtime
+    private static EmbeddedConnectCluster connect;
+
+    @BeforeClass
+    public static void setup() {
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+            .name("connector-validation-connect-cluster")
+            .workerProps(workerProps)
+            .build();
+        connect.start();
+    }
+
+    @AfterClass
+    public static void close() {
+        if (connect != null) {
+            // stop all Connect, Kafka and Zk threads.
+            Utils.closeQuietly(connect::stop, "Embedded Connect cluster");
+        }
+    }
+
+    @Test
+    public void testSinkConnectorHasNeitherTopicsListNorTopicsRegex() {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.remove(TOPICS_CONFIG);
+        config.remove(TOPICS_REGEX_CONFIG);
+        connect.validateConnectorConfig(config.get(CONNECTOR_CLASS_CONFIG), config);

Review comment:
       wait where are the assertions? isn't this supposed to fail?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+
+/**
+ * Integration test for preflight connector config validation
+ */
+@Category(IntegrationTest.class)
+public class ConnectorValidationIntegrationTest {
+
+    private static final String WORKER_GROUP_ID = "connect-worker-group-id";
+
+    // Use a single embedded cluster for all test cases in order to cut down on runtime
+    private static EmbeddedConnectCluster connect;
+
+    @BeforeClass
+    public static void setup() {
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+            .name("connector-validation-connect-cluster")
+            .workerProps(workerProps)
+            .build();
+        connect.start();
+    }
+
+    @AfterClass
+    public static void close() {
+        if (connect != null) {
+            // stop all Connect, Kafka and Zk threads.
+            Utils.closeQuietly(connect::stop, "Embedded Connect cluster");
+        }
+    }
+
+    @Test
+    public void testSinkConnectorHasNeitherTopicsListNorTopicsRegex() {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.remove(TOPICS_CONFIG);
+        config.remove(TOPICS_REGEX_CONFIG);
+        connect.validateConnectorConfig(config.get(CONNECTOR_CLASS_CONFIG), config);
+    }
+
+    @Test
+    public void testSinkConnectorHasBothTopicsListAndTopicsRegex() {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(TOPICS_CONFIG, "t1");
+        config.put(TOPICS_REGEX_CONFIG, "r.*");
+        connect.validateConnectorConfig(config.get(CONNECTOR_CLASS_CONFIG), config);
+    }
+
+    @Test
+    public void testSinkConnectorDeadLetterQueueTopicInTopicsList() {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(TOPICS_CONFIG, "t1");
+        config.put(DLQ_TOPIC_NAME_CONFIG, "t1");
+        connect.validateConnectorConfig(config.get(CONNECTOR_CLASS_CONFIG), config);
+    }
+
+    @Test
+    public void testSinkConnectorDeadLetterQueueTopicMatchesTopicsRegex() {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(TOPICS_REGEX_CONFIG, "r.*");
+        config.put(DLQ_TOPIC_NAME_CONFIG, "ruh.roh");
+        connect.validateConnectorConfig(config.get(CONNECTOR_CLASS_CONFIG), config);
+    }
+
+    @Test
+    public void testSinkConnectorDefaultGroupIdConflictsWithWorkerGroupId() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        // Combined with the logic in SinkUtils::consumerGroupId, this should conflict with the worker group ID
+        config.put(NAME_CONFIG, "worker-group-id");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+            config.get(CONNECTOR_CLASS_CONFIG),
+            config,
+            1,
+            "Sink connector config should fail preflight validation when default consumer group ID conflicts with Connect worker group ID"
+        );
+    }
+
+    @Test
+    public void testSinkConnectorOverriddenGroupIdConflictsWithWorkerGroupId() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + GROUP_ID_CONFIG, WORKER_GROUP_ID);
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+            config.get(CONNECTOR_CLASS_CONFIG),
+            config,
+            1,
+            "Sink connector config should fail preflight validation when overridden consumer group ID conflicts with Connect worker group ID"
+        );
+    }
+
+    @Test
+    public void testSourceConnectorHasDuplicateTopicCreationGroups() throws InterruptedException {
+        Map<String, String> config = defaultSourceConnectorProps();
+        config.put(TOPIC_CREATION_GROUPS_CONFIG, "g1, g2, g1");
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+            config.get(CONNECTOR_CLASS_CONFIG),
+            config,
+            1,
+            "Source connector config should fail preflight validation when the same topic creation group is specified multiple times"
+        );
+    }
+
+    @Test
+    // TODO: Is this actually necessary? Should we permit the same SMT to be applied multiple times?

Review comment:
       Was this a validation that already takes place, and you're just adding a test for it?
   I think this makes sense because you can just work around it by renaming your transform name and copy-pasting the transform configurations.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
##########
@@ -70,60 +73,105 @@
             "keys, all error context header keys will start with <code>__connect.errors.</code>";
     private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";
 
-    static ConfigDef config = ConnectorConfig.configDef()
-        .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
-        .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
-        .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
-        .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
-        .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
-
     public static ConfigDef configDef() {
-        return config;
+        return ConnectorConfig.configDef()
+            .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
+            .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
+            .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
+            .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
+            .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
     }
 
     public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
-        super(plugins, config, props);
+        super(plugins, configDef(), props);
     }
 
     /**
      * Throw an exception if the passed-in properties do not constitute a valid sink.
      * @param props sink configuration properties
      */
     public static void validate(Map<String, String> props) {
-        final boolean hasTopicsConfig = hasTopicsConfig(props);
-        final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
-        final boolean hasDlqTopicConfig = hasDlqTopicConfig(props);
+        validate(
+            props,
+            error -> {
+                throw new ConfigException(error.property, error.value, error.errorMessage);
+            }
+        );
+    }
+
+    /**
+     * Perform preflight validation for the sink-specific properties for this connector.
+     */
+    public static Map<String, ConfigValue> validate(Map<String, ConfigValue> validatedConfig, Map<String, String> props) {
+        validate(props, error -> addErrorMessage(validatedConfig, error));
+        return validatedConfig;
+    }
+
+    private static void validate(Map<String, String> props, Consumer<ConfigError> onError) {
+        final String topicsList = props.get(TOPICS_CONFIG);
+        final String topicsRegex = props.get(TOPICS_REGEX_CONFIG);
+        final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG, "").trim();
+        final boolean hasTopicsConfig = !Utils.isBlank(topicsList);
+        final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex);
+        final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic);
 
         if (hasTopicsConfig && hasTopicsRegexConfig) {
-            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG +
-                " are mutually exclusive options, but both are set.");
+            String errorMessage = TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set.";
+            onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, errorMessage));
+            onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, errorMessage));
         }
 
         if (!hasTopicsConfig && !hasTopicsRegexConfig) {
-            throw new ConfigException("Must configure one of " +
-                SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
+            String errorMessage = "Must configure one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG;
+            onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, errorMessage));
+            onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, errorMessage));
         }
 
         if (hasDlqTopicConfig) {
-            String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim();
             if (hasTopicsConfig) {
                 List<String> topics = parseTopicsList(props);
                 if (topics.contains(dlqTopic)) {
-                    throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the list of "
-                            + "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics));
+                    String errorMessage = String.format(
+                        "The DLQ topic '%s' may not be included in the list of topics ('%s=%s') consumed by the connector",
+                        dlqTopic, TOPICS_CONFIG, topics
+                    );
+                    onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, errorMessage));
                 }
             }
             if (hasTopicsRegexConfig) {
-                String topicsRegexStr = props.get(SinkTask.TOPICS_REGEX_CONFIG);
-                Pattern pattern = Pattern.compile(topicsRegexStr);
+                Pattern pattern = Pattern.compile(topicsRegex);
                 if (pattern.matcher(dlqTopic).matches()) {
-                    throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the regex matching the "
-                            + "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr));
+                    String errorMessage = String.format(
+                        "The DLQ topic '%s' may not be included in the regex matching the topics ('%s=%s') consumed by the connector",
+                        dlqTopic, TOPICS_REGEX_CONFIG, topicsRegex
+                    );
+                    onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, errorMessage));
                 }
             }
         }
     }
 
+    private static class ConfigError {

Review comment:
       This has the same contents as a plain ConfigException, but I think the reason you're using this class instead of the plain exception to avoid the cost of instantiating the exception and filling the stacktrace. I think this is a good consideration, but seeing as the existing ConfigDef infrastructure leverages the normal ConfigException in so many cases, I'm not sure this performance impact is substantial.
   
   It's also desirable to return multiple errors from this one method, and that's not possible using an exception. I think passing in `Map<String, ConfigValue>` and calling the addErrorMessage function as a utility function directly avoids the need for this POJO, while getting you the ability to return multiple errors. 
   
   Also, Is there a reason that we can't have this validation return a ConfigInfos object, and merge it in late like the rest of the validations taking place in `AbstractHerder::validateConnnectorConfig`? That seems like the standard convention.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org