You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/05 17:48:01 UTC

[jira] [Commented] (KAFKA-6253) Improve sink connector topic regex validation

    [ https://issues.apache.org/jira/browse/KAFKA-6253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352691#comment-16352691 ] 

ASF GitHub Bot commented on KAFKA-6253:
---------------------------------------

ewencp closed pull request #4251: KAFKA-6253: Improve sink connector topic regex validation
URL: https://github.com/apache/kafka/pull/4251
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 02465c922f1..b913f9ed65e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -256,9 +256,13 @@ public ConfigInfos validateConnectorConfig(Map<String, String> connectorProps) {
         Connector connector = getConnector(connType);
         ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
         try {
-            ConfigDef baseConfigDef = (connector instanceof SourceConnector)
-                    ? SourceConnectorConfig.configDef()
-                    : SinkConnectorConfig.configDef();
+            ConfigDef baseConfigDef;
+            if (connector instanceof SourceConnector) {
+                baseConfigDef = SourceConnectorConfig.configDef();
+            } else {
+                baseConfigDef = SinkConnectorConfig.configDef();
+                SinkConnectorConfig.validate(connectorProps);
+            }
             ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false);
             Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
                     connector,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index cf5564c25c2..887a4da2dea 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.transforms.util.RegexValidator;
@@ -34,7 +35,7 @@
     public static final String TOPICS_DEFAULT = "";
     private static final String TOPICS_DISPLAY = "Topics";
 
-    private static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG;
+    public static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG;
     private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume. " +
         "Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " +
         "Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified.";
@@ -52,4 +53,34 @@ public static ConfigDef configDef() {
     public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
         super(plugins, config, props);
     }
+
+    /**
+     * Throw an exception if the passed-in properties do not constitute a valid sink.
+     * @param props sink configuration properties
+     */
+    public static void validate(Map<String, String> props) {
+        final boolean hasTopicsConfig = hasTopicsConfig(props);
+        final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
+
+        if (hasTopicsConfig && hasTopicsRegexConfig) {
+            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG +
+                " are mutually exclusive options, but both are set.");
+        }
+
+        if (!hasTopicsConfig && !hasTopicsRegexConfig) {
+            throw new ConfigException("Must configure one of " +
+                SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
+        }
+    }
+
+    public static boolean hasTopicsConfig(Map<String, String> props) {
+        String topicsStr = props.get(TOPICS_CONFIG);
+        return topicsStr != null && !topicsStr.trim().isEmpty();
+    }
+
+    public static boolean hasTopicsRegexConfig(Map<String, String> props) {
+        String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
+        return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
+    }
+
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9b934f3428a..611e196d9de 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -77,6 +77,9 @@ public void initialize(ConnectorConfig connectorConfig) {
         try {
             this.config = connectorConfig.originalsStrings();
             log.debug("{} Initializing connector {} with config {}", this, connName, config);
+            if (isSinkConnector()) {
+                SinkConnectorConfig.validate(config);
+            }
 
             connector.initialize(new ConnectorContext() {
                 @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 69614948147..39e17e32a5f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -25,7 +25,6 @@
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -259,27 +258,14 @@ public int commitFailures() {
      * Initializes and starts the SinkTask.
      */
     protected void initializeAndStart() {
-        String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
-        boolean topicsStrPresent = topicsStr != null && !topicsStr.trim().isEmpty();
+        SinkConnectorConfig.validate(taskConfig);
 
-        String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
-        boolean topicsRegexStrPresent = topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
-
-        if (topicsStrPresent && topicsRegexStrPresent) {
-            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG +
-                " are mutually exclusive options, but both are set.");
-        }
-
-        if (!topicsStrPresent && !topicsRegexStrPresent) {
-            throw new ConfigException("Must configure one of " +
-                SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
-        }
-
-        if (topicsStrPresent) {
-            String[] topics = topicsStr.split(",");
+        if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
+            String[] topics = taskConfig.get(SinkTask.TOPICS_CONFIG).split(",");
             consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
             log.debug("{} Initializing and starting task for topics {}", this, topics);
         } else {
+            String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
             Pattern pattern = Pattern.compile(topicsRegexStr);
             consumer.subscribe(pattern, new HandleRebalance());
             log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 500d31f42e0..97cc7d94448 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@@ -175,6 +176,21 @@ public void testConfigValidationMissingName() {
         verifyAll();
     }
 
+    @Test(expected = ConfigException.class)
+    public void testConfigValidationInvalidTopics() {
+        AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class);
+        replayAll();
+
+        Map<String, String> config = new HashMap();
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
+        config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
+        config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
+
+        herder.validateConnectorConfig(config);
+
+        verifyAll();
+    }
+
     @Test()
     public void testConfigValidationTransformsExtendResults() {
         AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d7307cfdbc4..d7a7d87cb4f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -355,7 +355,7 @@ public void testCreateConnector() throws Exception {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -398,7 +398,7 @@ public void testCreateConnectorFailedBasicValidation() throws Exception {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -443,7 +443,7 @@ public void testCreateConnectorFailedCustomValidation() throws Exception {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -1338,7 +1338,7 @@ public void testPutConnectorConfig() throws Exception {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 79be45b3290..fd330f28009 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -125,7 +125,8 @@ public void testCreateSourceConnector() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         PowerMock.replayAll();
 
@@ -142,7 +143,7 @@ public void testCreateConnectorFailedBasicValidation() throws Exception {
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -167,7 +168,7 @@ public void testCreateConnectorFailedBasicValidation() throws Exception {
     public void testCreateConnectorFailedCustomValidation() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
 
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -199,7 +200,7 @@ public void testCreateConnectorAlreadyExists() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
@@ -224,7 +225,8 @@ public void testCreateSinkConnector() throws Exception {
         expectAdd(SourceSink.SINK);
 
         Map<String, String> config = connectorConfig(SourceSink.SINK);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        expectConfigValidation(connectorMock, true, config);
         PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
@@ -238,7 +240,8 @@ public void testDestroyConnector() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
         statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
@@ -270,7 +273,8 @@ public void testRestartConnector() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
@@ -295,7 +299,8 @@ public void testRestartConnectorFailureOnStart() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
@@ -326,7 +331,8 @@ public void testRestartTask() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(connectorConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
 
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
@@ -351,7 +357,8 @@ public void testRestartTaskFailureOnStart() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(connectorConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
 
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
@@ -381,7 +388,8 @@ public void testCreateAndStop() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(connectorConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
 
         // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
         expectStop();
@@ -402,6 +410,7 @@ public void testCreateAndStop() throws Exception {
     @Test
     public void testAccessors() throws Exception {
         Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
+        System.out.println(connConfig);
 
         Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class);
         Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class);
@@ -421,7 +430,8 @@ public void testAccessors() throws Exception {
         // Create connector
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        expectConfigValidation(connConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connector, true, connConfig);
 
         // Validate accessors with 1 connector
         listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME));
@@ -467,7 +477,7 @@ public void testPutConnectorConfig() throws Exception {
         // Create
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, connConfig);
 
         // Should get first config
@@ -526,7 +536,8 @@ public void testCorruptConfig() {
         Map<String, String> config = new HashMap<>();
         config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
         config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName());
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR);
+        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
         String error = "This is an error in your config!";
         List<String> errors = new ArrayList<>(singletonList(error));
         String key = "foo.invalid.key";
@@ -592,7 +603,7 @@ private void expectAdd(SourceSink sourceSink) throws Exception {
         EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName()))
             .andReturn(ConnectorType.SOURCE).anyTimes();
         EasyMock.expect(herder.connectorTypeForClass(BogusSinkConnector.class.getName()))
-        .andReturn(ConnectorType.SINK).anyTimes();
+            .andReturn(ConnectorType.SINK).anyTimes();
         worker.isSinkConnector(CONNECTOR_NAME);
         PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
     }
@@ -631,12 +642,6 @@ private void expectDestroy() {
         return generatedTaskProps;
     }
 
-
-    private void expectConfigValidation(Map<String, String> ... configs) {
-        Connector connectorMock = PowerMock.createMock(Connector.class);
-        expectConfigValidation(connectorMock, true, configs);
-    }
-
     private void expectConfigValidation(
             Connector connectorMock,
             boolean shouldCreateConnector,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Improve sink connector topic regex validation
> ---------------------------------------------
>
>                 Key: KAFKA-6253
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6253
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Jeff Klukas
>            Priority: Major
>             Fix For: 1.1.0, 1.2.0
>
>
> KAFKA-3073 adds topic regex support for sink connectors. The addition requires that you only specify one of topics or topics.regex settings. This is being validated in one place, but not during submission of connectors. We should improve this since this means it's possible to get a bad connector config into the config topic.
> For more detailed discussion, see https://github.com/apache/kafka/pull/4151#pullrequestreview-77300221



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)