You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/08/02 18:04:03 UTC

[kafka] branch trunk updated: KAFKA-13546: Do not fail connector validation if default topic creation group is explicitly specified (#11615)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a02c8d336a KAFKA-13546: Do not fail connector validation if default topic creation group is explicitly specified (#11615)
a02c8d336a is described below

commit a02c8d336a1370714bc13396e4dad5dbfb237543
Author: venkatteki <76...@users.noreply.github.com>
AuthorDate: Tue Aug 2 23:33:57 2022 +0530

    KAFKA-13546: Do not fail connector validation if default topic creation group is explicitly specified (#11615)
    
    Reviewers: Chris Egerton <fe...@gmail.com>
---
 .../apache/kafka/connect/runtime/SourceConnectorConfig.java | 11 +++++++++++
 .../kafka/connect/runtime/SourceConnectorConfigTest.java    | 13 +++++++++++++
 2 files changed, 24 insertions(+)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index 65d376b08f..2115bda662 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED;
 import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED;
@@ -48,6 +50,8 @@ import static org.apache.kafka.common.utils.Utils.enumOptions;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
+    private static final Logger log = LoggerFactory.getLogger(SourceConnectorConfig.class);
+
     protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
 
     public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
@@ -223,6 +227,13 @@ public class SourceConnectorConfig extends ConnectorConfig {
             topicCreationGroups.addAll((List<?>) aliases);
         }
 
+        //Remove "topic.creation.groups" config if its present and the value is "default"
+        if (topicCreationGroups.contains(DEFAULT_TOPIC_CREATION_GROUP)) {
+            log.warn("'{}' topic creation group always exists and does not need to be listed explicitly",
+                DEFAULT_TOPIC_CREATION_GROUP);
+            topicCreationGroups.removeAll(Collections.singleton(DEFAULT_TOPIC_CREATION_GROUP));
+        }
+
         ConfigDef newDef = new ConfigDef(baseConfigDef);
         String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + ".";
         short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
index 1972b62e81..251bb72fbe 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
@@ -33,6 +33,7 @@ import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfigTest.MOCK_PLUGINS;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
 import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
 import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
 import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
@@ -47,6 +48,8 @@ import static org.junit.Assert.assertTrue;
 public class SourceConnectorConfigTest {
 
     private static final String FOO_CONNECTOR = "foo-source";
+    private static final String TOPIC_CREATION_GROUP_1 = "group1";
+    private static final String TOPIC_CREATION_GROUP_2 = "group2";
     private static final short DEFAULT_REPLICATION_FACTOR = -1;
     private static final int DEFAULT_PARTITIONS = -1;
 
@@ -64,6 +67,16 @@ public class SourceConnectorConfigTest {
         return props;
     }
 
+    @Test
+    public void shouldNotFailWithExplicitlySpecifiedDefaultTopicCreationGroup() {
+        Map<String, String> props = defaultConnectorProps();
+        props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", DEFAULT_TOPIC_CREATION_GROUP,
+            TOPIC_CREATION_GROUP_1, TOPIC_CREATION_GROUP_2));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, "1");
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, "1");
+        SourceConnectorConfig config = new SourceConnectorConfig(MOCK_PLUGINS, props, true);
+    }
+
     @Test
     public void noTopicCreation() {
         Map<String, String> props = defaultConnectorProps();