You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/24 02:10:01 UTC

[GitHub] [kafka] kkonstantine opened a new pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

kkonstantine opened a new pull request #8722:
URL: https://github.com/apache/kafka/pull/8722


   Kafka Connect workers have been able to create Connect's internal topics using the new admin client for some time now (see KAFKA-4667). However, tasks of source connectors are still relying upon the broker to auto-create topics with default config settings if they don't exist, or expect these topics to exist before the connector is deployed, if their configuration needs to be specialized. 
   
   With the implementation of KIP-158 here, if `topic.creation.enable=true`, Kafka Connect will supply the source tasks of connectors that are configured to create topics with an admin client that will allow them to create new topics on-the-fly before writing the first source records to a new topic. Additionally, each source connector has the opportunity to customize the topic-specific settings of these new topics by defining groups of topic configurations. 
   
   This feature is tested here via unit tests (old tests that have been adjusted and new ones) as well as integration tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#discussion_r430089754



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -678,7 +701,8 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         if (topic != null && !topic.isEmpty()) {
             Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
                                                                 connectorClientConfigOverridePolicy);
-            Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            // Leaving default client id empty means that the admin client will set the default at instantiation time
+            Map<String, Object> adminProps = adminConfigs(id, "", config, connConfig, connectorClass, connectorClientConfigOverridePolicy);

Review comment:
       Definitely. I'm in favor too if we don't mind the change. Added `connector-dlq-adminclient-`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +192,76 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final String name;
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig config) {
+            this.name = group;
+            this.inclusionPattern = Pattern.compile(String.join("|", config.topicCreationInclude(group)));
+            this.exclusionPattern = Pattern.compile(String.join("|", config.topicCreationExclude(group)));
+            this.numPartitions = config.topicCreationPartitions(group);
+            this.replicationFactor = config.topicCreationReplicationFactor(group);
+            this.otherConfigs = config.topicCreationOtherConfigs(group);
+        }
+
+        public String name() {
+            return name;
+        }
+
+        public boolean matches(String topic) {
+            return !exclusionPattern.matcher(topic).matches() && inclusionPattern.matcher(topic).matches();
+        }
+
+        public NewTopic newTopic(String topic) {
+            NewTopicBuilder builder = new NewTopicBuilder(topic);
+            return builder.partitions(numPartitions)
+                    .replicationFactor(replicationFactor)
+                    .config(otherConfigs)
+                    .build();
+        }
+
+        public static Map<String, NewTopicCreationGroup> configuredGroups(SourceConnectorConfig config) {
+            List<String> groupNames = config.getList(TOPIC_CREATION_GROUPS_CONFIG);
+            Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>();
+            for (String group : groupNames) {
+                groups.put(group, new NewTopicCreationGroup(group, config));
+            }
+            // Even if there was a group called 'default' in the config, it will be overriden here.
+            // Order matters for all the topic groups besides the default, since it will be
+            // removed from this collection by the Worker
+            groups.put(DEFAULT_TOPIC_CREATION_GROUP, new NewTopicCreationGroup(DEFAULT_TOPIC_CREATION_GROUP, config));
+            return groups;
+        }

Review comment:
       I have to admit two things: 
   a) I tried it when I wrote it, because I remembered that auto formatting in IntelliJ applies a similar type of ordering (although I can't reproduce with my current settings). 
   b) I'm not a huge fan. I think I prefer the ordering that says: 
   static member fields first, followed by non-static member fields, then constructors, then some logical ordering of the rest of the methods. I can see factory methods following constructors, but other than that I think a logical ordering of methods gives us good flexibility. 
   
   I checked in case I had missed a recent updated guideline in [Google Java Style Guide](https://google.github.io/styleguide/javaguide.html#s3.4.2-ordering-class-contents) which we loosely follow, or elsewhere. Found another old but reasonable [recommendation](https://www.oracle.com/technetwork/java/codeconventions-150003.pdf). But I didn't find any clear guideline for adding static methods before constructors, member fields and other methods. Given that this imposes an ordering other than logical for methods, I feel we could skip on that one. wdyt?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());

Review comment:
       Good observation. Done. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());
+        log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);
+        NewTopic newTopic = topicGroup.newTopic(topic);
+
+        if (admin.createTopic(newTopic)) {
+            topicCreation.topicCache().add(topic);
+            log.info("Created topic '{}' using creation group {}", newTopic, topicGroup);

Review comment:
       👍 good catch for a second time. I added it, then removed after I added the `name` field and didn't add it back. It should be there now. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +191,47 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig config) {
+            inclusionPattern = Pattern.compile(String.join("|", config.topicCreationInclude(group)));
+            exclusionPattern = Pattern.compile(String.join("|", config.topicCreationExclude(group)));
+            numPartitions = config.topicCreationPartitions(group);
+            replicationFactor = config.topicCreationReplicationFactor(group);
+            otherConfigs = config.topicCreationOtherConfigs(group);
+        }
+
+        public boolean matches(String topic) {
+            return !exclusionPattern.matcher(topic).matches() && inclusionPattern.matcher(topic).matches();
+        }
+
+        public NewTopic newTopic(String topic) {
+            NewTopicBuilder builder = new NewTopicBuilder(topic);
+            return builder.partitions(numPartitions)
+                    .replicationFactor(replicationFactor)
+                    .config(otherConfigs)
+                    .build();
+        }

Review comment:
       Added for real this time :)

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
+    }
+
+    public static class TopicCreation {
+        private static final TopicCreation EMPTY =
+                new TopicCreation(false, null, Collections.emptyMap(), Collections.emptySet());
+
+        private final boolean isTopicCreationEnabled;
+        private final NewTopicCreationGroup defaultTopicGroup;
+        private final Map<String, NewTopicCreationGroup> topicGroups;
+        private final Set<String> topicCache;
+
+        protected TopicCreation(boolean isTopicCreationEnabled,
+                                NewTopicCreationGroup defaultTopicGroup,
+                                Map<String, NewTopicCreationGroup> topicGroups,
+                                Set<String> topicCache) {
+            this.isTopicCreationEnabled = isTopicCreationEnabled;
+            this.defaultTopicGroup = defaultTopicGroup;
+            this.topicGroups = topicGroups;
+            this.topicCache = topicCache;
+        }
+
+        public static TopicCreation newTopicCreation(WorkerConfig workerConfig,
+                                                     Map<String, NewTopicCreationGroup> topicGroups) {
+            if (!workerConfig.topicCreationEnable() || topicGroups == null) {
+                return EMPTY;
+            }
+            Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>(topicGroups);
+            groups.remove(DEFAULT_TOPIC_CREATION_GROUP);
+            return new TopicCreation(true, topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP), groups, new HashSet<>());
+        }

Review comment:
       See reply on the subject on the similar comment. This factory method is right below the constructor here. 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
##########
@@ -0,0 +1,1490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.integration.MonitorableSourceConnector;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ThreadedTest;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreation;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@PowerMockIgnore({"javax.management.*",
+                  "org.apache.log4j.*"})
+@RunWith(PowerMockRunner.class)
+public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
+    private static final String TOPIC = "topic";
+    private static final String OTHER_TOPIC = "other-topic";
+    private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
+    private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
+
+    // Connect-format data
+    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+    private static final Integer KEY = -1;
+    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
+    private static final Long RECORD = 12L;
+    // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version
+    // is used in the right place.
+    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
+    private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
+
+    private ExecutorService executor = Executors.newSingleThreadExecutor();
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
+    private WorkerConfig config;
+    private SourceConnectorConfig sourceConfig;
+    private Plugins plugins;
+    private MockConnectMetrics metrics;
+    @Mock private SourceTask sourceTask;
+    @Mock private Converter keyConverter;
+    @Mock private Converter valueConverter;
+    @Mock private HeaderConverter headerConverter;
+    @Mock private TransformationChain<SourceRecord> transformationChain;
+    @Mock private KafkaProducer<byte[], byte[]> producer;
+    @Mock private TopicAdmin admin;
+    @Mock private CloseableOffsetStorageReader offsetReader;
+    @Mock private OffsetStorageWriter offsetWriter;
+    @Mock private ClusterConfigState clusterConfigState;
+    private WorkerSourceTask workerTask;
+    @Mock private Future<RecordMetadata> sendFuture;
+    @MockStrict private TaskStatus.Listener statusListener;
+    @Mock private StatusBackingStore statusBackingStore;
+
+    private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
+
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+    static {
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+    }
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
+    private static final List<SourceRecord> RECORDS = Arrays.asList(
+            new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
+    );
+
+    // when this test becomes parameterized, this variable will be a test parameter
+    public boolean enableTopicCreation = true;
+
+    @Override
+    public void setup() {
+        super.setup();
+        Map<String, String> workerProps = workerProps();
+        plugins = new Plugins(workerProps);
+        config = new StandaloneConfig(workerProps);
+        sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorPropsWithGroups(TOPIC), true);
+        producerCallbacks = EasyMock.newCapture();
+        metrics = new MockConnectMetrics();
+    }
+
+    private Map<String, String> workerProps() {
+        Map<String, String> props = new HashMap<>();
+        props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        props.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
+        return props;
+    }
+
+    private Map<String, String> sourceConnectorPropsWithGroups(String topic) {
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "foo-connector");
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(1));
+        props.put(TOPIC_CONFIG, topic);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, topic);
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + INCLUDE_REGEX_CONFIG, ".*");
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + EXCLUDE_REGEX_CONFIG, topic);
+        return props;
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
+    }
+
+    private void createWorkerTask() {
+        createWorkerTask(TargetState.STARTED);
+    }
+
+    private void createWorkerTask(TargetState initialState) {
+        createWorkerTask(initialState, keyConverter, valueConverter, headerConverter);
+    }
+
+    private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
+        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
+                transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
+                offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore);
+    }
+
+    @Test
+    public void testStartPaused() throws Exception {
+        final CountDownLatch pauseLatch = new CountDownLatch(1);
+
+        createWorkerTask(TargetState.PAUSED);
+
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                pauseLatch.countDown();
+                return null;
+            }
+        });
+
+        expectClose();
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPause() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        AtomicInteger count = new AtomicInteger(0);
+        CountDownLatch pollLatch = expectPolls(10, count);
+        // In this test, we don't flush, so nothing goes any further than the offset writer
+
+        expectTopicCreation(TOPIC);
+
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+        assertTrue(awaitLatch(pollLatch));
+
+        workerTask.transitionTo(TargetState.PAUSED);
+
+        int priorCount = count.get();
+        Thread.sleep(100);
+
+        // since the transition is observed asynchronously, the count could be off by one loop iteration
+        assertTrue(count.get() - priorCount <= 1);
+
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPollsInBackground() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = expectPolls(10);
+        // In this test, we don't flush, so nothing goes any further than the offset writer
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(10);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testFailureInPoll() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = new CountDownLatch(1);
+        final RuntimeException exception = new RuntimeException();
+        EasyMock.expect(sourceTask.poll()).andAnswer(new IAnswer<List<SourceRecord>>() {
+            @Override
+            public List<SourceRecord> answer() throws Throwable {
+                pollLatch.countDown();
+                throw exception;
+            }
+        });
+
+        statusListener.onFailure(taskId, exception);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPollReturnsNoRecords() throws Exception {
+        // Test that the task handles an empty list of records
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectEmptyPolls(1, new AtomicInteger());
+        expectOffsetFlush(true);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(true);
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(1);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitFailure() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(true);
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(false);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(1);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsConvertsData() throws Exception {
+        createWorkerTask();
+
+        List<SourceRecord> records = new ArrayList<>();
+        // Can just use the same record for key and value
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsPropagatesTimestamp() throws Exception {
+        final Long timestamp = System.currentTimeMillis();
+
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(timestamp, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testSendRecordsCorruptTimestamp() throws Exception {
+        final Long timestamp = -3L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsNoTimestamp() throws Exception {
+        final Long timestamp = -1L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsRetries() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        // First round
+        expectSendRecordOnce(false);
+        // Any Producer retriable exception should work here
+        expectSendRecordSyncFailure(new org.apache.kafka.common.errors.TimeoutException("retriable sync failure"));
+
+        // Second round
+        expectSendRecordOnce(true);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record2, record3), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerCallbackFail() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        expectSendRecordProducerCallbackFail();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerSendFailsImmediately() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        expectTopicCreation(TOPIC);
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject()))
+                .andThrow(new KafkaException("Producer closed while send in progress", new InvalidTopicException(TOPIC)));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test
+    public void testSendRecordsTaskCommitRecordFail() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        // Source task commit record failure will not cause the task to abort
+        expectSendRecordOnce(false);
+        expectSendRecordTaskCommitRecordFail(false, false);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSlowTaskStart() throws Exception {
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        final CountDownLatch finishStartupLatch = new CountDownLatch(1);
+
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                startupLatch.countDown();
+                assertTrue(awaitLatch(finishStartupLatch));
+                return null;
+            }
+        });
+
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> workerTaskFuture = executor.submit(workerTask);
+
+        // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
+        // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
+        // cannot be invoked immediately in the thread trying to stop the task.
+        assertTrue(awaitLatch(startupLatch));
+        workerTask.stop();
+        finishStartupLatch.countDown();
+        assertTrue(workerTask.awaitStop(1000));
+
+        workerTaskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCancel() {
+        createWorkerTask();
+
+        offsetReader.close();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.cancel();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testMetricsGroup() {
+        SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics);
+        SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, metrics);
+        for (int i = 0; i != 10; ++i) {
+            group.recordPoll(100, 1000 + i * 100);
+            group.recordWrite(10);
+        }
+        for (int i = 0; i != 20; ++i) {
+            group1.recordPoll(100, 1000 + i * 100);
+            group1.recordWrite(10);
+        }
+        assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d);
+        assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d);
+        assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
+        assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
+        assertEquals(900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-active-count"), 0.001d);
+
+        // Close the group
+        group.close();
+
+        for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
+            // Metrics for this group should no longer exist
+            assertFalse(group.metricGroup().groupId().includes(metricName));
+        }
+        // Sensors for this group should no longer exist
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-read"));
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-send"));
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-active-count"));
+        assertNull(group.metricGroup().metrics().getSensor("partition-count"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-seq-number"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
+        assertNull(group.metricGroup().metrics().getSensor("put-batch-time"));
+
+        assertEquals(2900.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1950.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(66.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-rate"), 0.001d);
+        assertEquals(2000, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-total"), 0.001d);
+        assertEquals(6.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d);
+        assertEquals(200, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d);
+        assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d);
+    }
+
+    @Test
+    public void testHeaders() throws Exception {
+        Headers headers = new RecordHeaders();
+        headers.add("header_key", "header_value".getBytes());
+
+        org.apache.kafka.connect.header.Headers connectHeaders = new ConnectHeaders();
+        connectHeaders.add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
+
+        createWorkerTask();
+
+        List<SourceRecord> records = new ArrayList<>();
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders));
+
+        expectTopicCreation(TOPIC);
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, true, false, true, true, true, headers);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+        assertEquals(headers, sent.getValue().headers());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testHeadersWithCustomConverter() throws Exception {
+        StringConverter stringConverter = new StringConverter();
+        TestConverterWithHeaders testConverter = new TestConverterWithHeaders();
+
+        createWorkerTask(TargetState.STARTED, stringConverter, testConverter, stringConverter);
+
+        List<SourceRecord> records = new ArrayList<>();
+
+        String stringA = "Árvíztűrő tükörfúrógép";
+        org.apache.kafka.connect.header.Headers headersA = new ConnectHeaders();
+        String encodingA = "latin2";
+        headersA.addString("encoding", encodingA);
+
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, stringA, null, headersA));
+
+        String stringB = "Тестовое сообщение";
+        org.apache.kafka.connect.header.Headers headersB = new ConnectHeaders();
+        String encodingB = "koi8_r";
+        headersB.addString("encoding", encodingB);
+
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, stringB, null, headersB));
+
+        expectTopicCreation(TOPIC);
+
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(TOPIC, false, false, true, true, false, null);
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(TOPIC, false, false, true, true, false, null);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+
+        assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap(sentRecordA.getValue().key()));
+        assertEquals(
+            ByteBuffer.wrap(stringA.getBytes(encodingA)),
+            ByteBuffer.wrap(sentRecordA.getValue().value())
+        );
+        assertEquals(encodingA, new String(sentRecordA.getValue().headers().lastHeader("encoding").value()));
+
+        assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap(sentRecordB.getValue().key()));
+        assertEquals(
+            ByteBuffer.wrap(stringB.getBytes(encodingB)),
+            ByteBuffer.wrap(sentRecordB.getValue().value())
+        );
+        assertEquals(encodingB, new String(sentRecordB.getValue().headers().lastHeader("encoding").value()));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTopicCreateWhenTopicExists() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
+        TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc));
+
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test
+    public void testSendRecordsTopicDescribeRetries() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        // First round - call to describe the topic times out
+        EasyMock.expect(admin.describeTopics(TOPIC))
+                .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+        // Second round - calls to describe and create succeed
+        expectTopicCreation(TOPIC);
+        // Exactly two records are sent
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+    }
+
+    @Test
+    public void testSendRecordsTopicCreateRetries() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First call to describe the topic times out
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+    }
+
+    @Test
+    public void testSendRecordsTopicDescribeRetriesMidway() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First round
+        expectPreliminaryCalls(OTHER_TOPIC);
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        // First call to describe the topic times out
+        EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
+                .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(OTHER_TOPIC);
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders());
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsTopicCreateRetriesMidway() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First round
+        expectPreliminaryCalls(OTHER_TOPIC);
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
+        // First call to create the topic times out
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(OTHER_TOPIC);
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders());
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicDescribeFails() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC))
+                .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicCreateFails() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertNotNull(newTopicCapture.getValue());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertNotNull(newTopicCapture.getValue());
+    }
+
+    @Test
+    public void testTopicCreationClassWhenTopicCreationIsEnabled() {
+        TopicCreationGroup expectedDefaultGroup =
+                TopicCreationGroup.configuredGroups(sourceConfig).get(DEFAULT_TOPIC_CREATION_GROUP);
+
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config,
+                TopicCreationGroup.configuredGroups(sourceConfig));
+
+        assertTrue(topicCreation.isTopicCreationEnabled());
+        assertTrue(topicCreation.isTopicCreationRequired(TOPIC));
+        assertThat(topicCreation.defaultTopicGroup(), is(expectedDefaultGroup));
+        assertEquals(2, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups().keySet(), hasItems("foo", "bar"));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }
+
+    @Test
+    public void testTopicCreationClassWhenTopicCreationIsDisabled() {
+        Map<String, String> workerProps = workerProps();
+        workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(false));
+        config = new StandaloneConfig(workerProps);
+
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config,
+                TopicCreationGroup.configuredGroups(sourceConfig));
+
+        assertFalse(topicCreation.isTopicCreationEnabled());
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+        assertNull(topicCreation.defaultTopicGroup());
+        assertEquals(0, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups(), is(Collections.emptyMap()));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }
+
+    @Test
+    public void testEmptyTopicCreationClass() {
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config, null);
+
+        assertFalse(topicCreation.isTopicCreationEnabled());
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+        assertNull(topicCreation.defaultTopicGroup());
+        assertEquals(0, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups(), is(Collections.emptyMap()));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }

Review comment:
       Indeed. That's in my latest commit. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,157 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, "Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY);
+    }
+
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input.
+     *
+     * @param baseConfigDef the base configuration definition to be enriched
+     * @param props the non parsed configuration properties
+     * @return the enriched configuration definition
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, AbstractConfig defaultGroupConfig) {
+        List<Object> topicCreationGroups = new ArrayList<>();
+        Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
+        if (aliases instanceof List) {
+            topicCreationGroups.addAll((List<?>) aliases);
+        }
+
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + ".";
+        short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
+        int defaultGroupPartitions = defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG);
+        topicCreationGroups.stream().distinct().forEach(group -> {
+            if (!(group instanceof String)) {
+                throw new ConfigException("Item in " + TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
+            }
+            String alias = (String) group;
+            String prefix = TOPIC_CREATION_PREFIX + alias + ".";
+            String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
+            newDef.embed(prefix, configGroup, 0,
+                    TopicCreationConfig.configDef(configGroup, defaultGroupReplicationFactor, defaultGroupPartitions));
+        });
+        return newDef;
+    }
+
+    @Override
+    public Object get(String key) {

Review comment:
       Good catch on that one. I'll move it below. The configs are the classes that often have the most random layout.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";

Review comment:
       Accepting and will fix typo in a follow up commit. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";

Review comment:
       Accepting and will fix typo in a follow up commit. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
+            } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: ", this, producerRecord, e);

Review comment:
       Lmk if it looks ok now. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics "
+            + "created for this connector. This value must not be larger than the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be thrown when the "
+            + "connector will attempt to create a topic. For the default group this configuration"
+            + " is required. For any other group defined in topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new topics created for"
+            + " this connector. For the default group this configuration is required. For any "
+            + "other group defined in topic.creation.groups this config is optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in regular expression");

Review comment:
       Yeah, sure. I missed that in other cases we add the exception message. This exception does not accept a Throwable as a cause. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -250,9 +251,17 @@
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable";
+    protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, it allows "
+            + "source connectors to create topics by specifying topic creation properties "
+            + "with the prefix `" + TOPIC_CREATION_PREFIX + "`. Each task will use an "
+            + "admin client to create its topics and will not depend on the Kafka brokers "
+            + "to create topics automatically.";

Review comment:
       Accepted the suggestion here and I'll return to fix the typo in `source connector`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";

Review comment:
       I assume you mean to `match the topic names` 
   I added what I had on the kip here. I agree it's a bit convoluted. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
+            } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: ", this, producerRecord, e);

Review comment:
       True. I kept what we were doing. Backports won't be easy due to conflicts from `trunk` to `2.5` anyways. I'll merge here and add it to the list of things we should consider backporting in `2.5` and back. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
+            } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: ", this, producerRecord, e);
                 toSend = toSend.subList(processed, toSend.size());
                 lastSendFailed = true;
                 counter.retryRemaining();
                 return false;
+            } catch (ConnectException e) {
+                log.warn("{} Failed to send {} with unrecoverable exception: ", this, producerRecord, e);

Review comment:
       Changed as above. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -166,11 +176,14 @@ protected void close() {
                 log.warn("Could not close producer", t);
             }
         }
-        try {
-            transformationChain.close();
-        } catch (Throwable t) {
-            log.warn("Could not close transformation chain", t);
+        if (admin != null) {
+            try {
+                admin.close(Duration.ofSeconds(30));
+            } catch (Throwable t) {
+                log.warn("Failed to close admin client on time", t);
+            }

Review comment:
       We don't do that with other clients or in `closeQuietly`
   I'd say we don't need to because although not required by `AutoCloseable` the admin implementation of `close` seems idempotent. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,167 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, "Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY);
     }
 
-    public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input.
+     *
+     * @param baseConfigDef the base configuration definition to be enriched
+     * @param props the non parsed configuration properties
+     * @return the enriched configuration definition
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, AbstractConfig defaultGroupConfig) {
+        List<Object> topicCreationGroups = new ArrayList<>();
+        Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
+        if (aliases instanceof List) {
+            topicCreationGroups.addAll((List<?>) aliases);
+        }
+
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + ".";
+        short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
+        int defaultGroupPartitions = defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG);
+        topicCreationGroups.stream().distinct().forEach(group -> {
+            if (!(group instanceof String)) {
+                throw new ConfigException("Item in " + TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
+            }
+            String alias = (String) group;
+            String prefix = TOPIC_CREATION_PREFIX + alias + ".";
+            String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
+            newDef.embed(prefix, configGroup, 0,
+                    TopicCreationConfig.configDef(configGroup, defaultGroupReplicationFactor, defaultGroupPartitions));
+        });
+        return newDef;
+    }
+
+    public SourceConnectorConfig(Plugins plugins, Map<String, String> props, boolean createTopics) {
         super(plugins, config, props);
+        if (createTopics && props.entrySet().stream().anyMatch(e -> e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
+            ConfigDef defaultConfigDef = embedDefaultGroup(config);
+            // This config is only used to set default values for partitions and replication
+            // factor from the default group and otherwise it remains unused
+            AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef, props, false);
+
+            // If the user has added regex of include or exclude patterns in the default group,
+            // they should be ignored.

Review comment:
       TDD :) 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals "
+            + "used to match the names topics used by the source connector. This list is used "
+            + "to include topics that should be created using the topic settings defined by this group.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals "

Review comment:
       NP. Fixed. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics "
+            + "created for this connector. This value must not be larger than the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be thrown when the "
+            + "connector will attempt to create a topic. For the default group this configuration"
+            + " is required. For any other group defined in topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new topics created for"
+            + " this connector. For the default group this configuration is required. For any "
+            + "other group defined in topic.creation.groups this config is optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in regular expression");
+            }
+        },
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+
+    private static void validatePartitions(String configName, int factor) {
+        if (factor != TopicAdmin.NO_PARTITIONS && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Number of partitions must be positive, or -1 to use the broker's default");
+        }
+    }
+
+    private static void validateReplicationFactor(String configName, short factor) {
+        if (factor != TopicAdmin.NO_REPLICATION_FACTOR && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Replication factor must be positive, or -1 to use the broker's default");
+        }
+    }
+
+    public static ConfigDef configDef(String group, short defaultReplicationFactor, int defaultParitionCount) {
+        int orderInGroup = 0;
+        ConfigDef configDef = new ConfigDef();
+        configDef
+                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        REGEX_VALIDATOR, ConfigDef.Importance.LOW,
+                        INCLUDE_REGEX_DOC, group, ++orderInGroup, ConfigDef.Width.LONG,
+                        "Inclusion Topic Pattern for " + group)
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        REGEX_VALIDATOR, ConfigDef.Importance.LOW,
+                        EXCLUDE_REGEX_DOC, group, ++orderInGroup, ConfigDef.Width.LONG,
+                        "Exclusion Topic Pattern for " + group)
+                .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT,
+                        defaultReplicationFactor, REPLICATION_FACTOR_VALIDATOR,
+                        ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, group, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Replication Factor for Topics in " + group)
+                .define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
+                        defaultParitionCount, PARTITIONS_VALIDATOR,
+                        ConfigDef.Importance.LOW, PARTITIONS_DOC, group, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Partition Count for Topics in " + group);
+        return configDef;
+    }
+
+    public static ConfigDef defaultGroupConfigDef() {
+        int orderInGroup = 0;
+        ConfigDef configDef = new ConfigDef();
+        configDef
+                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, ".*",
+                        new ConfigDef.NonNullValidator(), ConfigDef.Importance.LOW,
+                        INCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup, ConfigDef.Width.LONG,
+                        "Inclusion Topic Pattern for " + DEFAULT_TOPIC_CREATION_GROUP)
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        new ConfigDef.NonNullValidator(), ConfigDef.Importance.LOW,
+                        EXCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup, ConfigDef.Width.LONG,
+                        "Exclusion Topic Pattern for " + DEFAULT_TOPIC_CREATION_GROUP)
+                .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT,
+                        ConfigDef.NO_DEFAULT_VALUE, REPLICATION_FACTOR_VALIDATOR,
+                        ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Replication Factor for Topics in " + DEFAULT_TOPIC_CREATION_GROUP)
+                .define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
+                        ConfigDef.NO_DEFAULT_VALUE, PARTITIONS_VALIDATOR,
+                        ConfigDef.Importance.LOW, PARTITIONS_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Partition Count for Topics in " + DEFAULT_TOPIC_CREATION_GROUP);
+        return configDef;
+    }

Review comment:
       I'm in favor of including since it's well tested. We could reconsider a consolidation in the future. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine merged pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine merged pull request #8722:
URL: https://github.com/apache/kafka/pull/8722


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#discussion_r429658260



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -250,9 +250,16 @@
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable";
+    protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, it allows "
+            + "source connectors to create topics with custom settings. If enabled, each connector "
+            + "task will use an admin clients to create its topics and will not depend on "
+            + "auto.create.topics.enable being set on Kafka brokers.";

Review comment:
       This seems to not match the proposed behavior, specifically with respect to source connectors having to opt in by setting at least `topic.creation.default.replication.factor` and `topic.creation.default.partitions`.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list. $alias applies to any group defined in topic"
+            + ".creation.groups but not the default";

Review comment:
       I know you're following the text that was in the KIP, which made sense because `$alias` appeared in the property key rule in the table. However, as a user the appearance of `$alias` here would be really hard to follow. I'd argue since we're dynamically adding configkeys for all topic creation group aliases and therefore the doc will apply to a specific include key (e.g., `topic.creation.rule1.include`), we should remove this sentence from the documentation of each of the generated configkeys.
   ```suggestion
               + "that match this inclusion list.";
   ```
   
   Same with the other documentation constants in this class.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +159,18 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.isTopicCreationEnabled =
+                workerConfig.getBoolean(TOPIC_CREATION_ENABLE_CONFIG) && topicGroups != null;
+        if (isTopicCreationEnabled) {
+            this.defaultTopicGroup = topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP);
+            this.topicGroups = new LinkedHashMap<>(topicGroups);
+            this.topicGroups.remove(DEFAULT_TOPIC_CREATION_GROUP);
+            this.topicCache = new HashSet<>();
+        } else {
+            this.defaultTopicGroup = null;
+            this.topicGroups = Collections.emptyMap();
+            this.topicCache = Collections.emptySet();
+        }

Review comment:
       This seems like maybe it might be worth encapsulating a lot of this logic -- and even some of the logic in `maybeCreateGroup` -- in a class that maybe can be more easily tested.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.DEFAULT_NUM_BROKERS;
+
+/**
+ * Integration test for the endpoints that offer topic tracking of a connector's active
+ * topics.

Review comment:
       I don't think this is correct anymore, is it?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -678,7 +701,8 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         if (topic != null && !topic.isEmpty()) {
             Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
                                                                 connectorClientConfigOverridePolicy);
-            Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            // Leaving default client id empty means that the admin client will set the default at instantiation time
+            Map<String, Object> adminProps = adminConfigs(id, "", config, connConfig, connectorClass, connectorClientConfigOverridePolicy);

Review comment:
       Are you keeping this blank for backward compatibility?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -802,6 +826,16 @@ public String workerId() {
         return workerId;
     }
 
+    /**
+     * Returns whether this worker is configured to allow source connectors to create the topics
+     * that they use with custom configurations, if these topics don't already exist.
+     *
+     * @return true if topic creation by source connectors is allowed; false otherwise
+     */
+    public boolean isTopicCreationEnabled() {
+        return config.getBoolean(TOPIC_CREATION_ENABLE_CONFIG);

Review comment:
       Why not use the new method on WorkerConfig instead?
   ```suggestion
           return config.topicCreationEnable();
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,153 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, "Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY);
     }
 
-    public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input.
+     *
+     * @param baseConfigDef
+     * @param props
+     * @return the enriched configuration definition
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, AbstractConfig defaultGroupConfig) {
+        List<Object> topicCreationGroups = new ArrayList<>();
+        Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
+        if (aliases instanceof List) {
+            topicCreationGroups.addAll((List<?>) aliases);
+        }
+
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + ".";
+        topicCreationGroups.stream().distinct().forEach(group -> {
+            if (!(group instanceof String)) {
+                throw new ConfigException("Item in " + TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
+            }
+            String alias = (String) group;
+            String prefix = TOPIC_CREATION_PREFIX + alias + ".";
+            String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
+            newDef.embed(prefix, configGroup, 0, TopicCreationConfig.configDef(
+                    configGroup,
+                    defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG),
+                    defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG)));

Review comment:
       Since these are the same for all `topicCreationGroups`, WDYT about pulling these out of this lambda? I think it would help with readability.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +417,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!isTopicCreationEnabled || topicCache.contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCache.add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = topicGroups.values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(defaultTopicGroup);
+        log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);

Review comment:
       Need a `NewTopicCreationGroup.toString()` implementation for this to work

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +417,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!isTopicCreationEnabled || topicCache.contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCache.add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = topicGroups.values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(defaultTopicGroup);
+        log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);
+        NewTopic newTopic = topicGroup.newTopic(topic);
+
+        if (admin.createTopic(newTopic)) {
+            topicCache.add(topic);
+            log.info("Created topic '{}'", newTopic);

Review comment:
       WDYT about including the name of the topic creation group that was used (it's only included in debug logs so far) and the actual topic settings used to create the topic? Something like:
   ```suggestion
               log.info("Created topic '{}' using creation group '{}' and topic settings {}", newTopic, topicGroup, topicGroup.topicSettings());
   ```
   if the `NewTopicCreationGroup.toString()` method output the name only, or
   ```suggestion
               log.info("Created topic '{}' using creation group {}", newTopic, topicGroup);
   ```
   if the `NewTopicCreationGroup.toString()` method output the name and topic settings.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +159,18 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.isTopicCreationEnabled =
+                workerConfig.getBoolean(TOPIC_CREATION_ENABLE_CONFIG) && topicGroups != null;

Review comment:
       Use the WorkerConfig method:
   ```suggestion
           this.isTopicCreationEnabled = workerConfig.topicCreationEnable() && topicGroups != null;
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +191,47 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig config) {
+            inclusionPattern = Pattern.compile(String.join("|", config.topicCreationInclude(group)));
+            exclusionPattern = Pattern.compile(String.join("|", config.topicCreationExclude(group)));
+            numPartitions = config.topicCreationPartitions(group);
+            replicationFactor = config.topicCreationReplicationFactor(group);
+            otherConfigs = config.topicCreationOtherConfigs(group);
+        }
+
+        public boolean matches(String topic) {
+            return !exclusionPattern.matcher(topic).matches() && inclusionPattern.matcher(topic).matches();
+        }
+
+        public NewTopic newTopic(String topic) {
+            NewTopicBuilder builder = new NewTopicBuilder(topic);
+            return builder.partitions(numPartitions)
+                    .replicationFactor(replicationFactor)
+                    .config(otherConfigs)
+                    .build();
+        }

Review comment:
       This class needs a `toString()` method since instances of this class are used in log statements.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
##########
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.integration.MonitorableSourceConnector;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.LogReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.ToleranceType;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Time.SYSTEM;
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class})
+@PowerMockIgnore("javax.management.*")
+public class ErrorHandlingTaskWithTopicCreationTest {
+
+    private static final String TOPIC = "test";
+    private static final int PARTITION1 = 12;
+    private static final int PARTITION2 = 13;
+    private static final long FIRST_OFFSET = 45;
+
+    @Mock Plugins plugins;
+
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+
+    static {
+        TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
+    }
+
+    public static final long OPERATOR_RETRY_TIMEOUT_MILLIS = 60000;
+    public static final long OPERATOR_RETRY_MAX_DELAY_MILLIS = 5000;
+    public static final ToleranceType OPERATOR_TOLERANCE_TYPE = ToleranceType.ALL;
+
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private TargetState initialState = TargetState.STARTED;
+    private Time time;
+    private MockConnectMetrics metrics;
+    @SuppressWarnings("unused")
+    @Mock
+    private SinkTask sinkTask;
+    @SuppressWarnings("unused")
+    @Mock
+    private SourceTask sourceTask;
+    private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
+    private WorkerConfig workerConfig;
+    private SourceConnectorConfig sourceConfig;
+    @Mock
+    private PluginClassLoader pluginLoader;
+    @SuppressWarnings("unused")
+    @Mock
+    private HeaderConverter headerConverter;
+    private WorkerSinkTask workerSinkTask;
+    private WorkerSourceTask workerSourceTask;
+    @SuppressWarnings("unused")
+    @Mock
+    private KafkaConsumer<byte[], byte[]> consumer;
+    @SuppressWarnings("unused")
+    @Mock
+    private KafkaProducer<byte[], byte[]> producer;
+    @SuppressWarnings("unused")
+    @Mock private TopicAdmin admin;
+
+    @Mock
+    OffsetStorageReaderImpl offsetReader;
+    @Mock
+    OffsetStorageWriter offsetWriter;
+
+    private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+    @SuppressWarnings("unused")
+    @Mock
+    private TaskStatus.Listener statusListener;
+    @SuppressWarnings("unused")
+    @Mock private StatusBackingStore statusBackingStore;
+
+    private ErrorHandlingMetrics errorHandlingMetrics;
+
+    // when this test becomes parameterized, this variable will be a test parameter
+    public boolean enableTopicCreation = true;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, 0, 0);
+        metrics = new MockConnectMetrics();
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");

Review comment:
       Why are we setting these and not relying upon the defaults? Is there value in explicitly setting these in this test?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +191,47 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig config) {
+            inclusionPattern = Pattern.compile(String.join("|", config.topicCreationInclude(group)));
+            exclusionPattern = Pattern.compile(String.join("|", config.topicCreationExclude(group)));

Review comment:
       I guess we're lucky here that topic names can't have `,` characters and therefore the regex can be split by `,` as part of the `topic.creation.<alias>.include` and `topic.creation.<alias>.exclude` properties being parsed as lists, and then recombined into a single regex.
   
   Would it be simpler to use a single regex for each of the include and exclude properties rather than lists? Would that be *safer* -- and maybe a bit simpler UX and implementation -- than the current proposal?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
##########
@@ -0,0 +1,1408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ThreadedTest;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockNice;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Worker.class, Plugins.class})
+@PowerMockIgnore("javax.management.*")
+public class WorkerWithTopicCreationTest extends ThreadedTest {
+
+    private static final String CONNECTOR_ID = "test-connector";
+    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
+    private static final String WORKER_ID = "localhost:8083";
+    private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+    private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
+
+    private Map<String, String> workerProps = new HashMap<>();
+    private WorkerConfig config;
+    private Worker worker;
+
+    private Map<String, String> defaultProducerConfigs = new HashMap<>();
+    private Map<String, String> defaultConsumerConfigs = new HashMap<>();
+
+    @Mock
+    private Plugins plugins;
+    @Mock
+    private PluginClassLoader pluginLoader;
+    @Mock
+    private DelegatingClassLoader delegatingLoader;
+    @Mock
+    private OffsetBackingStore offsetBackingStore;
+    @MockStrict
+    private TaskStatus.Listener taskStatusListener;
+    @MockStrict
+    private ConnectorStatus.Listener connectorStatusListener;
+
+    @Mock private Herder herder;
+    @Mock private StatusBackingStore statusBackingStore;
+    @Mock private Connector connector;
+    @Mock private ConnectorContext ctx;
+    @Mock private TestSourceTask task;
+    @Mock private WorkerSourceTask workerTask;
+    @Mock private Converter keyConverter;
+    @Mock private Converter valueConverter;
+    @Mock private Converter taskKeyConverter;
+    @Mock private Converter taskValueConverter;
+    @Mock private HeaderConverter taskHeaderConverter;
+    @Mock private ExecutorService executorService;
+    @MockNice private ConnectorConfig connectorConfig;
+    private String mockFileProviderTestId;
+    private Map<String, String> connectorProps;
+
+    // when this test becomes parameterized, this variable will be a test parameter
+    public boolean enableTopicCreation = true;
+
+    @Before
+    public void setup() {
+        super.setup();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");

Review comment:
       Are these useful?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,153 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, "Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY);
     }
 
-    public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }

Review comment:
       Even though `SourceConnectorConfig` was an existing class without a unit test, we're adding a significant amount of non-trivial logic that we should unit test.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.DEFAULT_NUM_BROKERS;
+
+/**
+ * Integration test for the endpoints that offer topic tracking of a connector's active
+ * topics.
+ */
+@Category(IntegrationTest.class)
+public class SourceConnectorsIntegrationTest {
+
+    private static final int NUM_WORKERS = 3;
+    private static final int NUM_TASKS = 1;
+    private static final String FOO_TOPIC = "foo-topic";
+    private static final String FOO_CONNECTOR = "foo-source";
+    private static final String BAR_TOPIC = "bar-topic";
+    private static final String BAR_CONNECTOR = "bar-source";
+    private static final String FOO_GROUP = "foo";
+    private static final String BAR_GROUP = "bar";
+    private static final int DEFAULT_REPLICATION_FACTOR = DEFAULT_NUM_BROKERS;
+    private static final int DEFAULT_PARTITIONS = 1;
+    private static final int FOO_GROUP_REPLICATION_FACTOR = DEFAULT_NUM_BROKERS;
+    private static final int FOO_GROUP_PARTITIONS = 9;
+
+    private EmbeddedConnectCluster.Builder connectBuilder;
+    private EmbeddedConnectCluster connect;
+    Map<String, String> workerProps = new HashMap<>();
+    Properties brokerProps = new Properties();
+
+    @Before
+    public void setup() {
+        // setup Connect worker properties
+        workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
+
+        // setup Kafka broker properties
+        brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+        // build a Connect cluster backed by Kafka and Zk
+        connectBuilder = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .workerProps(workerProps)
+                .brokerProps(brokerProps)
+                .maskExitProcedures(true); // true is the default, setting here as example
+    }
+
+    @After
+    public void close() {
+        // stop all Connect, Kafka and Zk threads.
+        connect.stop();
+    }
+
+    @Test
+    public void testCreateTopic() throws InterruptedException {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
+
+        Map<String, String> fooProps = sourceConnectorPropsWithGroups(FOO_TOPIC);
+
+        // start a source connector
+        connect.configureConnector(FOO_CONNECTOR, fooProps);
+        fooProps.put(NAME_CONFIG, FOO_CONNECTOR);
+
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(fooProps.get(CONNECTOR_CLASS_CONFIG), fooProps, 0,
+                "Validating connector configuration produced an unexpected number or errors.");
+
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        connect.assertions().assertTopicsExist(FOO_TOPIC);
+        connect.assertions().assertTopicSettings(FOO_TOPIC, FOO_GROUP_REPLICATION_FACTOR, FOO_GROUP_PARTITIONS);
+    }
+
+    @Test
+    public void testSwitchingToTopicCreationEnabled() throws InterruptedException {

Review comment:
       How about an integration test that relies upon broker autocreation of topics?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.DEFAULT_NUM_BROKERS;
+
+/**
+ * Integration test for the endpoints that offer topic tracking of a connector's active
+ * topics.
+ */
+@Category(IntegrationTest.class)
+public class SourceConnectorsIntegrationTest {
+
+    private static final int NUM_WORKERS = 3;
+    private static final int NUM_TASKS = 1;
+    private static final String FOO_TOPIC = "foo-topic";
+    private static final String FOO_CONNECTOR = "foo-source";
+    private static final String BAR_TOPIC = "bar-topic";
+    private static final String BAR_CONNECTOR = "bar-source";
+    private static final String FOO_GROUP = "foo";
+    private static final String BAR_GROUP = "bar";
+    private static final int DEFAULT_REPLICATION_FACTOR = DEFAULT_NUM_BROKERS;
+    private static final int DEFAULT_PARTITIONS = 1;
+    private static final int FOO_GROUP_REPLICATION_FACTOR = DEFAULT_NUM_BROKERS;
+    private static final int FOO_GROUP_PARTITIONS = 9;
+
+    private EmbeddedConnectCluster.Builder connectBuilder;
+    private EmbeddedConnectCluster connect;
+    Map<String, String> workerProps = new HashMap<>();
+    Properties brokerProps = new Properties();
+
+    @Before
+    public void setup() {
+        // setup Connect worker properties
+        workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
+
+        // setup Kafka broker properties
+        brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+        // build a Connect cluster backed by Kafka and Zk
+        connectBuilder = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .workerProps(workerProps)
+                .brokerProps(brokerProps)
+                .maskExitProcedures(true); // true is the default, setting here as example
+    }
+
+    @After
+    public void close() {
+        // stop all Connect, Kafka and Zk threads.
+        connect.stop();
+    }
+
+    @Test
+    public void testCreateTopic() throws InterruptedException {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
+
+        Map<String, String> fooProps = sourceConnectorPropsWithGroups(FOO_TOPIC);
+
+        // start a source connector
+        connect.configureConnector(FOO_CONNECTOR, fooProps);
+        fooProps.put(NAME_CONFIG, FOO_CONNECTOR);
+
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(fooProps.get(CONNECTOR_CLASS_CONFIG), fooProps, 0,
+                "Validating connector configuration produced an unexpected number or errors.");
+
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        connect.assertions().assertTopicsExist(FOO_TOPIC);
+        connect.assertions().assertTopicSettings(FOO_TOPIC, FOO_GROUP_REPLICATION_FACTOR, FOO_GROUP_PARTITIONS);
+    }
+
+    @Test
+    public void testSwitchingToTopicCreationEnabled() throws InterruptedException {
+        workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(false));
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.kafka().createTopic(BAR_TOPIC, 1);
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");

Review comment:
       Should we also assert that the topic actually exists before we start the connector? Otherwise, we might not be testing what we think we are.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#issuecomment-633730773






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#discussion_r429994168



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -678,7 +701,8 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         if (topic != null && !topic.isEmpty()) {
             Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
                                                                 connectorClientConfigOverridePolicy);
-            Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            // Leaving default client id empty means that the admin client will set the default at instantiation time
+            Map<String, Object> adminProps = adminConfigs(id, "", config, connConfig, connectorClass, connectorClientConfigOverridePolicy);

Review comment:
       Yeah, I think it might help track down problems with admin principals.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());
+        log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);
+        NewTopic newTopic = topicGroup.newTopic(topic);
+
+        if (admin.createTopic(newTopic)) {
+            topicCreation.topicCache().add(topic);

Review comment:
       Another thing for potentially moving into `TopicCreation`, such as maybe adding a `addTopic(topic)` method. Doing so might allow you to hide most of the implementation details of TopicCreation.
   
   ```suggestion
               topicCreation.addTopic(topic);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -250,9 +251,17 @@
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable";
+    protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, it allows "
+            + "source connectors to create topics with custom settings. If enabled, in "
+            + "connectors that specify topic creation properties with the prefix `" + TOPIC_CREATION_PREFIX
+            + "` each task will use an admin client to create its topics and will not depend on "
+            + "auto.create.topics.enable being set on Kafka brokers.";

Review comment:
       Maybe:
   ```suggestion
       protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, it allows "
               + "source connectors to create topics by specifying topic creation properties "
               + "with the prefix `" + TOPIC_CREATION_PREFIX + "`. Each task will use an
               + "admin client to create its topics and will not depend on the Kafka brokers "
               + "to create topics automatically.";
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
+    }
+
+    public static class TopicCreation {
+        private static final TopicCreation EMPTY =
+                new TopicCreation(false, null, Collections.emptyMap(), Collections.emptySet());
+
+        private final boolean isTopicCreationEnabled;
+        private final NewTopicCreationGroup defaultTopicGroup;
+        private final Map<String, NewTopicCreationGroup> topicGroups;
+        private final Set<String> topicCache;
+
+        protected TopicCreation(boolean isTopicCreationEnabled,
+                                NewTopicCreationGroup defaultTopicGroup,
+                                Map<String, NewTopicCreationGroup> topicGroups,
+                                Set<String> topicCache) {
+            this.isTopicCreationEnabled = isTopicCreationEnabled;
+            this.defaultTopicGroup = defaultTopicGroup;
+            this.topicGroups = topicGroups;
+            this.topicCache = topicCache;
+        }
+
+        public static TopicCreation newTopicCreation(WorkerConfig workerConfig,
+                                                     Map<String, NewTopicCreationGroup> topicGroups) {
+            if (!workerConfig.topicCreationEnable() || topicGroups == null) {
+                return EMPTY;
+            }
+            Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>(topicGroups);
+            groups.remove(DEFAULT_TOPIC_CREATION_GROUP);
+            return new TopicCreation(true, topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP), groups, new HashSet<>());
+        }

Review comment:
       Nit: Since this is static, how about moving to before the member fields and methods?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);

Review comment:
       Another thing for potentially moving into `TopicCreation`, such as maybe adding a `addTopic(topic)` method. Doing so might allow you to hide most of the implementation details of `TopicCreation`.
   ```suggestion
               topicCreation.addTopic(topic);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());

Review comment:
       Couldn't this also be added to the `TopicCreation` class? Seems like it'd help unit test this logic in isolation, more easily covering the corner cases.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
+    }
+
+    public static class TopicCreation {

Review comment:
       Seems like this could be pulled out into `utils` (?), at which point it's also a lot easier to test more thoroughly.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
+    }
+
+    public static class TopicCreation {
+        private static final TopicCreation EMPTY =
+                new TopicCreation(false, null, Collections.emptyMap(), Collections.emptySet());
+
+        private final boolean isTopicCreationEnabled;
+        private final NewTopicCreationGroup defaultTopicGroup;
+        private final Map<String, NewTopicCreationGroup> topicGroups;
+        private final Set<String> topicCache;
+
+        protected TopicCreation(boolean isTopicCreationEnabled,
+                                NewTopicCreationGroup defaultTopicGroup,
+                                Map<String, NewTopicCreationGroup> topicGroups,
+                                Set<String> topicCache) {
+            this.isTopicCreationEnabled = isTopicCreationEnabled;
+            this.defaultTopicGroup = defaultTopicGroup;
+            this.topicGroups = topicGroups;
+            this.topicCache = topicCache;
+        }
+
+        public static TopicCreation newTopicCreation(WorkerConfig workerConfig,
+                                                     Map<String, NewTopicCreationGroup> topicGroups) {
+            if (!workerConfig.topicCreationEnable() || topicGroups == null) {
+                return EMPTY;
+            }
+            Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>(topicGroups);
+            groups.remove(DEFAULT_TOPIC_CREATION_GROUP);
+            return new TopicCreation(true, topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP), groups, new HashSet<>());
+        }
+
+        public static TopicCreation empty() {
+            return EMPTY;
+        }

Review comment:
       Nit: Since this is static, how about moving to before the member fields and methods?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());
+        log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);
+        NewTopic newTopic = topicGroup.newTopic(topic);
+
+        if (admin.createTopic(newTopic)) {
+            topicCreation.topicCache().add(topic);
+            log.info("Created topic '{}' using creation group {}", newTopic, topicGroup);

Review comment:
       `NewTopicCreationGroup` still needs a `toString()` method, since it's used in this log statement.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +192,76 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final String name;
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig config) {
+            this.name = group;
+            this.inclusionPattern = Pattern.compile(String.join("|", config.topicCreationInclude(group)));
+            this.exclusionPattern = Pattern.compile(String.join("|", config.topicCreationExclude(group)));
+            this.numPartitions = config.topicCreationPartitions(group);
+            this.replicationFactor = config.topicCreationReplicationFactor(group);
+            this.otherConfigs = config.topicCreationOtherConfigs(group);
+        }
+
+        public String name() {
+            return name;
+        }
+
+        public boolean matches(String topic) {
+            return !exclusionPattern.matcher(topic).matches() && inclusionPattern.matcher(topic).matches();
+        }
+
+        public NewTopic newTopic(String topic) {
+            NewTopicBuilder builder = new NewTopicBuilder(topic);
+            return builder.partitions(numPartitions)
+                    .replicationFactor(replicationFactor)
+                    .config(otherConfigs)
+                    .build();
+        }
+
+        public static Map<String, NewTopicCreationGroup> configuredGroups(SourceConnectorConfig config) {
+            List<String> groupNames = config.getList(TOPIC_CREATION_GROUPS_CONFIG);
+            Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>();
+            for (String group : groupNames) {
+                groups.put(group, new NewTopicCreationGroup(group, config));
+            }
+            // Even if there was a group called 'default' in the config, it will be overriden here.
+            // Order matters for all the topic groups besides the default, since it will be
+            // removed from this collection by the Worker
+            groups.put(DEFAULT_TOPIC_CREATION_GROUP, new NewTopicCreationGroup(DEFAULT_TOPIC_CREATION_GROUP, config));
+            return groups;
+        }

Review comment:
       Nit: wouldn't this public static method be better before any of the member fields or methods?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || topicCreation.topicCache().contains(topic)) {
+            return;
+        }

Review comment:
       Again, this logic could be moved into `TopicCreation` for easier testing and to make this code more readable:
   ```suggestion
           if (!topicCreation.isCreateTopicRequired(topic)) {
               return;
           }
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";

Review comment:
       This still is unclear, particularly what "match their values" means. Maybe:
   ```suggestion
       private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals "
               + "used to match the names topics used by the source connector. This list is used "
               + "to exclude topics from being created with the topic settings defined by this group. "
               + "Note that exclusion rules have precedent and override any inclusion rules for the topics.";
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";

Review comment:
       This still is unclear, particularly what "match their values" means. Maybe:
   ```suggestion
       private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals "
               + "used to match the names topics used by the source connector. This list is used "
               + "to include topics that should be created using the topic settings defined by this group.";
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics "
+            + "created for this connector. This value must not be larger than the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be thrown when the "
+            + "connector will attempt to create a topic. For the default group this configuration"
+            + " is required. For any other group defined in topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new topics created for"
+            + " this connector. For the default group this configuration is required. For any "
+            + "other group defined in topic.creation.groups this config is optional and if it's "
+            + "missing it gets the value the default group";

Review comment:
       Minor tweaks:
   ```suggestion
       private static final String PARTITIONS_DOC = "The number of partitions new topics created for "
               + "this connector. This value may be -1 to use the broker's default number of partitions, "
               + "or a positive number representing the desired number of partitions. "
               + "For the default group this configuration is required. For any "
               + "other group defined in topic.creation.groups this config is optional and if it's "
               + "missing it gets the value of the default group";
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics "
+            + "created for this connector. This value must not be larger than the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be thrown when the "
+            + "connector will attempt to create a topic. For the default group this configuration"
+            + " is required. For any other group defined in topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new topics created for"
+            + " this connector. For the default group this configuration is required. For any "
+            + "other group defined in topic.creation.groups this config is optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"

Review comment:
       WDYT?
   ```suggestion
           () -> "Positive number not larger than the number of brokers in the Kafka cluster, or -1 to use the broker's default"
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics "
+            + "created for this connector. This value must not be larger than the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be thrown when the "
+            + "connector will attempt to create a topic. For the default group this configuration"
+            + " is required. For any other group defined in topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default group";

Review comment:
       Minor tweaks:
   ```suggestion
       private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics "
               + "created for this connector using this group. This value may be -1 to use the broker's"
               + "default replication factor, or may be a positive number not larger than the number of "
               + "brokers in the Kafka cluster. A value larger than the number of brokers in the Kafka cluster "
               + "will result in an error when the new topic is created. For the default group this configuration "
               + "is required. For any other group defined in topic.creation.groups this config is "
               + "optional and if it's missing it gets the value of the default group";
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -250,9 +251,17 @@
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable";
+    protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, it allows "
+            + "source connectors to create topics by specifying topic creation properties "
+            + "with the prefix `" + TOPIC_CREATION_PREFIX + "`. Each task will use an "
+            + "admin client to create its topics and will not depend on the Kafka brokers "
+            + "to create topics automatically.";

Review comment:
       The "source connectors to create topics by specifying..." seems strange, since source connectors don't actually create topics and even if they did they wouldn't do so by specifying anything. Maybe:
   ```suggestion
       protected static final String TOPIC_CREATION_ENABLE_DOC = "Whether to allow "
               + "automatic creation of topics used by source connectors, when source connector "
               + "are configured with `" + TOPIC_CREATION_PREFIX + "` properties. Each task will use an "
               + "admin client to create its topics and will not depend on the Kafka brokers "
               + "to create topics automatically.";
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
+            } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: ", this, producerRecord, e);
                 toSend = toSend.subList(processed, toSend.size());
                 lastSendFailed = true;
                 counter.retryRemaining();
                 return false;
+            } catch (ConnectException e) {
+                log.warn("{} Failed to send {} with unrecoverable exception: ", this, producerRecord, e);

Review comment:
       This is a new log line, but it's similar to an existing one above. Nevertheless, this will write out the record's key and value to the log. We should instead only write the record coordinates.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics "
+            + "created for this connector. This value must not be larger than the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be thrown when the "
+            + "connector will attempt to create a topic. For the default group this configuration"
+            + " is required. For any other group defined in topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new topics created for"
+            + " this connector. For the default group this configuration is required. For any "
+            + "other group defined in topic.creation.groups this config is optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in regular expression");

Review comment:
       Should we include the `PatternSyntaxException` or its message in the `ConfigException`? As it stands, it will be clear *that* the regex is invalid but not *why* the regex is invalid.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
+            } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: ", this, producerRecord, e);

Review comment:
       I know this line existed before, but this writes out the record's key and value to the log. We should instead only write the record coordinates.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics "
+            + "created for this connector. This value must not be larger than the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be thrown when the "
+            + "connector will attempt to create a topic. For the default group this configuration"
+            + " is required. For any other group defined in topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new topics created for"
+            + " this connector. For the default group this configuration is required. For any "
+            + "other group defined in topic.creation.groups this config is optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in regular expression");
+            }
+        },
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+
+    private static void validatePartitions(String configName, int factor) {
+        if (factor != TopicAdmin.NO_PARTITIONS && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Number of partitions must be positive, or -1 to use the broker's default");
+        }
+    }
+
+    private static void validateReplicationFactor(String configName, short factor) {
+        if (factor != TopicAdmin.NO_REPLICATION_FACTOR && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Replication factor must be positive, or -1 to use the broker's default");
+        }
+    }
+
+    public static ConfigDef configDef(String group, short defaultReplicationFactor, int defaultParitionCount) {
+        int orderInGroup = 0;
+        ConfigDef configDef = new ConfigDef();
+        configDef
+                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        REGEX_VALIDATOR, ConfigDef.Importance.LOW,
+                        INCLUDE_REGEX_DOC, group, ++orderInGroup, ConfigDef.Width.LONG,
+                        "Inclusion Topic Pattern for " + group)
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        REGEX_VALIDATOR, ConfigDef.Importance.LOW,
+                        EXCLUDE_REGEX_DOC, group, ++orderInGroup, ConfigDef.Width.LONG,
+                        "Exclusion Topic Pattern for " + group)
+                .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT,
+                        defaultReplicationFactor, REPLICATION_FACTOR_VALIDATOR,
+                        ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, group, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Replication Factor for Topics in " + group)
+                .define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
+                        defaultParitionCount, PARTITIONS_VALIDATOR,
+                        ConfigDef.Importance.LOW, PARTITIONS_DOC, group, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Partition Count for Topics in " + group);
+        return configDef;
+    }
+
+    public static ConfigDef defaultGroupConfigDef() {
+        int orderInGroup = 0;
+        ConfigDef configDef = new ConfigDef();
+        configDef
+                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, ".*",
+                        new ConfigDef.NonNullValidator(), ConfigDef.Importance.LOW,
+                        INCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup, ConfigDef.Width.LONG,
+                        "Inclusion Topic Pattern for " + DEFAULT_TOPIC_CREATION_GROUP)
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        new ConfigDef.NonNullValidator(), ConfigDef.Importance.LOW,
+                        EXCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup, ConfigDef.Width.LONG,
+                        "Exclusion Topic Pattern for " + DEFAULT_TOPIC_CREATION_GROUP)
+                .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT,
+                        ConfigDef.NO_DEFAULT_VALUE, REPLICATION_FACTOR_VALIDATOR,
+                        ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Replication Factor for Topics in " + DEFAULT_TOPIC_CREATION_GROUP)
+                .define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
+                        ConfigDef.NO_DEFAULT_VALUE, PARTITIONS_VALIDATOR,
+                        ConfigDef.Importance.LOW, PARTITIONS_DOC, DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Partition Count for Topics in " + DEFAULT_TOPIC_CREATION_GROUP);
+        return configDef;
+    }

Review comment:
       I'm not sure whether this is a good idea or not, but if `configDef(String, ...)` were changed to take `Object` for the `defaultReplicationFactor` and `defaultPartitionCount`, then this method could be replaced with:
   ```suggestion
       public static ConfigDef defaultGroupConfigDef() {
           return configDef(DEFAULT_TOPIC_CREATION_GROUP, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.NO_DEFAULT_VALUE);
       }
   ```
   Even though we'd lose a bit of type safety on the `configDef(...)` method, we'd more clearly show how the default is similar to the other rules.
   
   Another alternative to maintain `configDef(...)` type safety is to accept `Short` and `Integer`, use `NO_DEFAULT_VALUE` if the parameters are null, and then change this method to:
   ```
       public static ConfigDef defaultGroupConfigDef() {
           return configDef(DEFAULT_TOPIC_CREATION_GROUP, null, null);
       }
   ```
   
   Again, not sure it's worth doing this. Up to you.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
+            } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: ", this, producerRecord, e);

Review comment:
       It may be worth changing this line in a subsequent PR that can be backported.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,157 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, "Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY);
+    }
+
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input.
+     *
+     * @param baseConfigDef the base configuration definition to be enriched
+     * @param props the non parsed configuration properties
+     * @return the enriched configuration definition
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, AbstractConfig defaultGroupConfig) {
+        List<Object> topicCreationGroups = new ArrayList<>();
+        Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
+        if (aliases instanceof List) {
+            topicCreationGroups.addAll((List<?>) aliases);
+        }
+
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + ".";
+        short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
+        int defaultGroupPartitions = defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG);
+        topicCreationGroups.stream().distinct().forEach(group -> {
+            if (!(group instanceof String)) {
+                throw new ConfigException("Item in " + TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
+            }
+            String alias = (String) group;
+            String prefix = TOPIC_CREATION_PREFIX + alias + ".";
+            String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
+            newDef.embed(prefix, configGroup, 0,
+                    TopicCreationConfig.configDef(configGroup, defaultGroupReplicationFactor, defaultGroupPartitions));
+        });
+        return newDef;
+    }
+
+    @Override
+    public Object get(String key) {

Review comment:
       I'm fine with not ordering all public static methods first, but I think you'll agree that having this member getter method appearing before the constructor does not follow the conventions and patters we use throughout the project.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to exclude topics that "
+            + "match their values and refrain from applying this group's specific configuration "
+            + "to the topics that match this exclusion list. Note that exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication factor for new topics "
+            + "created for this connector. This value must not be larger than the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be thrown when the "
+            + "connector will attempt to create a topic. For the default group this configuration"
+            + " is required. For any other group defined in topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new topics created for"
+            + " this connector. For the default group this configuration is required. For any "
+            + "other group defined in topic.creation.groups this config is optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in regular expression");
+            }
+        },
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+
+    private static void validatePartitions(String configName, int factor) {
+        if (factor != TopicAdmin.NO_PARTITIONS && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Number of partitions must be positive, or -1 to use the broker's default");
+        }
+    }
+
+    private static void validateReplicationFactor(String configName, short factor) {
+        if (factor != TopicAdmin.NO_REPLICATION_FACTOR && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Replication factor must be positive, or -1 to use the broker's default");

Review comment:
       How about:
   ```suggestion
                       "Replication factor must be positive and not larger than the number of brokers in the Kafka cluster, or -1 to use the broker's default");
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
##########
@@ -0,0 +1,1490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.integration.MonitorableSourceConnector;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ThreadedTest;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreation;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@PowerMockIgnore({"javax.management.*",
+                  "org.apache.log4j.*"})
+@RunWith(PowerMockRunner.class)
+public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
+    private static final String TOPIC = "topic";
+    private static final String OTHER_TOPIC = "other-topic";
+    private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
+    private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
+
+    // Connect-format data
+    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+    private static final Integer KEY = -1;
+    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
+    private static final Long RECORD = 12L;
+    // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version
+    // is used in the right place.
+    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
+    private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
+
+    private ExecutorService executor = Executors.newSingleThreadExecutor();
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
+    private WorkerConfig config;
+    private SourceConnectorConfig sourceConfig;
+    private Plugins plugins;
+    private MockConnectMetrics metrics;
+    @Mock private SourceTask sourceTask;
+    @Mock private Converter keyConverter;
+    @Mock private Converter valueConverter;
+    @Mock private HeaderConverter headerConverter;
+    @Mock private TransformationChain<SourceRecord> transformationChain;
+    @Mock private KafkaProducer<byte[], byte[]> producer;
+    @Mock private TopicAdmin admin;
+    @Mock private CloseableOffsetStorageReader offsetReader;
+    @Mock private OffsetStorageWriter offsetWriter;
+    @Mock private ClusterConfigState clusterConfigState;
+    private WorkerSourceTask workerTask;
+    @Mock private Future<RecordMetadata> sendFuture;
+    @MockStrict private TaskStatus.Listener statusListener;
+    @Mock private StatusBackingStore statusBackingStore;
+
+    private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
+
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+    static {
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+    }
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
+    private static final List<SourceRecord> RECORDS = Arrays.asList(
+            new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
+    );
+
+    // when this test becomes parameterized, this variable will be a test parameter
+    public boolean enableTopicCreation = true;
+
+    @Override
+    public void setup() {
+        super.setup();
+        Map<String, String> workerProps = workerProps();
+        plugins = new Plugins(workerProps);
+        config = new StandaloneConfig(workerProps);
+        sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorPropsWithGroups(TOPIC), true);
+        producerCallbacks = EasyMock.newCapture();
+        metrics = new MockConnectMetrics();
+    }
+
+    private Map<String, String> workerProps() {
+        Map<String, String> props = new HashMap<>();
+        props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        props.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
+        return props;
+    }
+
+    private Map<String, String> sourceConnectorPropsWithGroups(String topic) {
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "foo-connector");
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(1));
+        props.put(TOPIC_CONFIG, topic);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, topic);
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + INCLUDE_REGEX_CONFIG, ".*");
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + EXCLUDE_REGEX_CONFIG, topic);
+        return props;
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
+    }
+
+    private void createWorkerTask() {
+        createWorkerTask(TargetState.STARTED);
+    }
+
+    private void createWorkerTask(TargetState initialState) {
+        createWorkerTask(initialState, keyConverter, valueConverter, headerConverter);
+    }
+
+    private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
+        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
+                transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
+                offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore);
+    }
+
+    @Test
+    public void testStartPaused() throws Exception {
+        final CountDownLatch pauseLatch = new CountDownLatch(1);
+
+        createWorkerTask(TargetState.PAUSED);
+
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                pauseLatch.countDown();
+                return null;
+            }
+        });
+
+        expectClose();
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPause() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        AtomicInteger count = new AtomicInteger(0);
+        CountDownLatch pollLatch = expectPolls(10, count);
+        // In this test, we don't flush, so nothing goes any further than the offset writer
+
+        expectTopicCreation(TOPIC);
+
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+        assertTrue(awaitLatch(pollLatch));
+
+        workerTask.transitionTo(TargetState.PAUSED);
+
+        int priorCount = count.get();
+        Thread.sleep(100);
+
+        // since the transition is observed asynchronously, the count could be off by one loop iteration
+        assertTrue(count.get() - priorCount <= 1);
+
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPollsInBackground() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = expectPolls(10);
+        // In this test, we don't flush, so nothing goes any further than the offset writer
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(10);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testFailureInPoll() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = new CountDownLatch(1);
+        final RuntimeException exception = new RuntimeException();
+        EasyMock.expect(sourceTask.poll()).andAnswer(new IAnswer<List<SourceRecord>>() {
+            @Override
+            public List<SourceRecord> answer() throws Throwable {
+                pollLatch.countDown();
+                throw exception;
+            }
+        });
+
+        statusListener.onFailure(taskId, exception);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPollReturnsNoRecords() throws Exception {
+        // Test that the task handles an empty list of records
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectEmptyPolls(1, new AtomicInteger());
+        expectOffsetFlush(true);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(true);
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(1);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitFailure() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(true);
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(false);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(1);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsConvertsData() throws Exception {
+        createWorkerTask();
+
+        List<SourceRecord> records = new ArrayList<>();
+        // Can just use the same record for key and value
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsPropagatesTimestamp() throws Exception {
+        final Long timestamp = System.currentTimeMillis();
+
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(timestamp, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testSendRecordsCorruptTimestamp() throws Exception {
+        final Long timestamp = -3L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsNoTimestamp() throws Exception {
+        final Long timestamp = -1L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsRetries() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        // First round
+        expectSendRecordOnce(false);
+        // Any Producer retriable exception should work here
+        expectSendRecordSyncFailure(new org.apache.kafka.common.errors.TimeoutException("retriable sync failure"));
+
+        // Second round
+        expectSendRecordOnce(true);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record2, record3), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerCallbackFail() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        expectSendRecordProducerCallbackFail();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerSendFailsImmediately() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        expectTopicCreation(TOPIC);
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject()))
+                .andThrow(new KafkaException("Producer closed while send in progress", new InvalidTopicException(TOPIC)));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test
+    public void testSendRecordsTaskCommitRecordFail() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        // Source task commit record failure will not cause the task to abort
+        expectSendRecordOnce(false);
+        expectSendRecordTaskCommitRecordFail(false, false);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSlowTaskStart() throws Exception {
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        final CountDownLatch finishStartupLatch = new CountDownLatch(1);
+
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                startupLatch.countDown();
+                assertTrue(awaitLatch(finishStartupLatch));
+                return null;
+            }
+        });
+
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> workerTaskFuture = executor.submit(workerTask);
+
+        // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
+        // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
+        // cannot be invoked immediately in the thread trying to stop the task.
+        assertTrue(awaitLatch(startupLatch));
+        workerTask.stop();
+        finishStartupLatch.countDown();
+        assertTrue(workerTask.awaitStop(1000));
+
+        workerTaskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCancel() {
+        createWorkerTask();
+
+        offsetReader.close();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.cancel();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testMetricsGroup() {
+        SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics);
+        SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, metrics);
+        for (int i = 0; i != 10; ++i) {
+            group.recordPoll(100, 1000 + i * 100);
+            group.recordWrite(10);
+        }
+        for (int i = 0; i != 20; ++i) {
+            group1.recordPoll(100, 1000 + i * 100);
+            group1.recordWrite(10);
+        }
+        assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d);
+        assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d);
+        assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
+        assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
+        assertEquals(900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-active-count"), 0.001d);
+
+        // Close the group
+        group.close();
+
+        for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
+            // Metrics for this group should no longer exist
+            assertFalse(group.metricGroup().groupId().includes(metricName));
+        }
+        // Sensors for this group should no longer exist
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-read"));
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-send"));
+        assertNull(group.metricGroup().metrics().getSensor("sink-record-active-count"));
+        assertNull(group.metricGroup().metrics().getSensor("partition-count"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-seq-number"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion"));
+        assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
+        assertNull(group.metricGroup().metrics().getSensor("put-batch-time"));
+
+        assertEquals(2900.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1950.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(66.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-rate"), 0.001d);
+        assertEquals(2000, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-total"), 0.001d);
+        assertEquals(6.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d);
+        assertEquals(200, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d);
+        assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d);
+    }
+
+    @Test
+    public void testHeaders() throws Exception {
+        Headers headers = new RecordHeaders();
+        headers.add("header_key", "header_value".getBytes());
+
+        org.apache.kafka.connect.header.Headers connectHeaders = new ConnectHeaders();
+        connectHeaders.add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
+
+        createWorkerTask();
+
+        List<SourceRecord> records = new ArrayList<>();
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders));
+
+        expectTopicCreation(TOPIC);
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, true, false, true, true, true, headers);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+        assertEquals(headers, sent.getValue().headers());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testHeadersWithCustomConverter() throws Exception {
+        StringConverter stringConverter = new StringConverter();
+        TestConverterWithHeaders testConverter = new TestConverterWithHeaders();
+
+        createWorkerTask(TargetState.STARTED, stringConverter, testConverter, stringConverter);
+
+        List<SourceRecord> records = new ArrayList<>();
+
+        String stringA = "Árvíztűrő tükörfúrógép";
+        org.apache.kafka.connect.header.Headers headersA = new ConnectHeaders();
+        String encodingA = "latin2";
+        headersA.addString("encoding", encodingA);
+
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, stringA, null, headersA));
+
+        String stringB = "Тестовое сообщение";
+        org.apache.kafka.connect.header.Headers headersB = new ConnectHeaders();
+        String encodingB = "koi8_r";
+        headersB.addString("encoding", encodingB);
+
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, stringB, null, headersB));
+
+        expectTopicCreation(TOPIC);
+
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(TOPIC, false, false, true, true, false, null);
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(TOPIC, false, false, true, true, false, null);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+
+        assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap(sentRecordA.getValue().key()));
+        assertEquals(
+            ByteBuffer.wrap(stringA.getBytes(encodingA)),
+            ByteBuffer.wrap(sentRecordA.getValue().value())
+        );
+        assertEquals(encodingA, new String(sentRecordA.getValue().headers().lastHeader("encoding").value()));
+
+        assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap(sentRecordB.getValue().key()));
+        assertEquals(
+            ByteBuffer.wrap(stringB.getBytes(encodingB)),
+            ByteBuffer.wrap(sentRecordB.getValue().value())
+        );
+        assertEquals(encodingB, new String(sentRecordB.getValue().headers().lastHeader("encoding").value()));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTopicCreateWhenTopicExists() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
+        TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc));
+
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test
+    public void testSendRecordsTopicDescribeRetries() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        // First round - call to describe the topic times out
+        EasyMock.expect(admin.describeTopics(TOPIC))
+                .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+        // Second round - calls to describe and create succeed
+        expectTopicCreation(TOPIC);
+        // Exactly two records are sent
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+    }
+
+    @Test
+    public void testSendRecordsTopicCreateRetries() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First call to describe the topic times out
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+    }
+
+    @Test
+    public void testSendRecordsTopicDescribeRetriesMidway() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First round
+        expectPreliminaryCalls(OTHER_TOPIC);
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        // First call to describe the topic times out
+        EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
+                .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(OTHER_TOPIC);
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders());
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsTopicCreateRetriesMidway() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First round
+        expectPreliminaryCalls(OTHER_TOPIC);
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
+        // First call to create the topic times out
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(OTHER_TOPIC);
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders());
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicDescribeFails() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC))
+                .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicCreateFails() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertNotNull(newTopicCapture.getValue());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertNotNull(newTopicCapture.getValue());
+    }
+
+    @Test
+    public void testTopicCreationClassWhenTopicCreationIsEnabled() {
+        TopicCreationGroup expectedDefaultGroup =
+                TopicCreationGroup.configuredGroups(sourceConfig).get(DEFAULT_TOPIC_CREATION_GROUP);
+
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config,
+                TopicCreationGroup.configuredGroups(sourceConfig));
+
+        assertTrue(topicCreation.isTopicCreationEnabled());
+        assertTrue(topicCreation.isTopicCreationRequired(TOPIC));
+        assertThat(topicCreation.defaultTopicGroup(), is(expectedDefaultGroup));
+        assertEquals(2, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups().keySet(), hasItems("foo", "bar"));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }
+
+    @Test
+    public void testTopicCreationClassWhenTopicCreationIsDisabled() {
+        Map<String, String> workerProps = workerProps();
+        workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(false));
+        config = new StandaloneConfig(workerProps);
+
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config,
+                TopicCreationGroup.configuredGroups(sourceConfig));
+
+        assertFalse(topicCreation.isTopicCreationEnabled());
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+        assertNull(topicCreation.defaultTopicGroup());
+        assertEquals(0, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups(), is(Collections.emptyMap()));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }
+
+    @Test
+    public void testEmptyTopicCreationClass() {
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config, null);
+
+        assertFalse(topicCreation.isTopicCreationEnabled());
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+        assertNull(topicCreation.defaultTopicGroup());
+        assertEquals(0, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups(), is(Collections.emptyMap()));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }

Review comment:
       This test class is already fairly complex. Can these move to a new `TopicCreationTest` class to correspond to the new `TopicCreation` class?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -166,11 +176,14 @@ protected void close() {
                 log.warn("Could not close producer", t);
             }
         }
-        try {
-            transformationChain.close();
-        } catch (Throwable t) {
-            log.warn("Could not close transformation chain", t);
+        if (admin != null) {
+            try {
+                admin.close(Duration.ofSeconds(30));
+            } catch (Throwable t) {
+                log.warn("Failed to close admin client on time", t);
+            }

Review comment:
       Should this have a `finally` block that nulls the `admin` field?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,167 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, "Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY);
     }
 
-    public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input.
+     *
+     * @param baseConfigDef the base configuration definition to be enriched
+     * @param props the non parsed configuration properties
+     * @return the enriched configuration definition
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, AbstractConfig defaultGroupConfig) {
+        List<Object> topicCreationGroups = new ArrayList<>();
+        Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
+        if (aliases instanceof List) {
+            topicCreationGroups.addAll((List<?>) aliases);
+        }
+
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + ".";
+        short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
+        int defaultGroupPartitions = defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG);
+        topicCreationGroups.stream().distinct().forEach(group -> {
+            if (!(group instanceof String)) {
+                throw new ConfigException("Item in " + TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
+            }
+            String alias = (String) group;
+            String prefix = TOPIC_CREATION_PREFIX + alias + ".";
+            String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
+            newDef.embed(prefix, configGroup, 0,
+                    TopicCreationConfig.configDef(configGroup, defaultGroupReplicationFactor, defaultGroupPartitions));
+        });
+        return newDef;
+    }
+
+    public SourceConnectorConfig(Plugins plugins, Map<String, String> props, boolean createTopics) {
         super(plugins, config, props);
+        if (createTopics && props.entrySet().stream().anyMatch(e -> e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
+            ConfigDef defaultConfigDef = embedDefaultGroup(config);
+            // This config is only used to set default values for partitions and replication
+            // factor from the default group and otherwise it remains unused
+            AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef, props, false);
+
+            // If the user has added regex of include or exclude patterns in the default group,
+            // they should be ignored.

Review comment:
       Nice catch.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals "
+            + "used to match the names topics used by the source connector. This list is used "
+            + "to include topics that should be created using the topic settings defined by this group.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String INCLUDE_REGEX_DOC = "A list of regular expression literals "

Review comment:
       This is misnamed, and that was my fault when I made a wrong suggestion earlier.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#discussion_r429666952



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.DEFAULT_NUM_BROKERS;
+
+/**
+ * Integration test for the endpoints that offer topic tracking of a connector's active
+ * topics.

Review comment:
       Copy paste is a necessary evil :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#issuecomment-633247308


   Rebased to resolve conflicts


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#issuecomment-633345893


   Rebased just to resolve conflicts with https://github.com/apache/kafka/pull/2604


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#issuecomment-634430019


   jdk8: success
   jdk11: only two failures on a known flaky streams test (`EosBetaUpgradeIntegrationTest`)
   jdk14: a single failure on another unrelated streams test (`KTableSourceTopicRestartIntegrationTest`)
   
   Given these results I'll go ahead and merge this PR. 
   Thanks again for the reviews @rhauch !
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#issuecomment-633282441


   Thanks for your comments @rhauch !
   I'll addressing shortly. I just added unit tests for the source connector configs and I'll be adding a few more plus the integration tests that you mentioned. 
   
   I'll ping you here when it's ready for another pass. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#issuecomment-633335198


   Results of the latest completed build: 
   jk8: success
   jk11: the beta eos failure again
   jk14: success
   Coverage is now quite high, but I'm considering adding a few more tests. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#discussion_r429650887



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -193,15 +192,6 @@
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests";
     public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
 
-    private static final Validator REPLICATION_FACTOR_VALIDATOR = LambdaValidator.with(

Review comment:
       These had to be moved out of this class, or else tests for `StandaloneHerder` would break. They are not reused here as well as `TopicCreationConfig`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -193,15 +192,6 @@
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests";
     public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
 
-    private static final Validator REPLICATION_FACTOR_VALIDATOR = LambdaValidator.with(

Review comment:
       These had to be moved out of this class, or else tests for `StandaloneHerder` would break. They are now reused here as well as `TopicCreationConfig`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#discussion_r429677323



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -678,7 +701,8 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         if (topic != null && !topic.isEmpty()) {
             Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
                                                                 connectorClientConfigOverridePolicy);
-            Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            // Leaving default client id empty means that the admin client will set the default at instantiation time
+            Map<String, Object> adminProps = adminConfigs(id, "", config, connConfig, connectorClass, connectorClientConfigOverridePolicy);

Review comment:
       Yes, indeed. You think we should set them instead?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -250,9 +250,16 @@
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable";
+    protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, it allows "
+            + "source connectors to create topics with custom settings. If enabled, each connector "
+            + "task will use an admin clients to create its topics and will not depend on "
+            + "auto.create.topics.enable being set on Kafka brokers.";

Review comment:
       I added the additional info, but maybe there's still room for improvement. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +191,47 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig config) {
+            inclusionPattern = Pattern.compile(String.join("|", config.topicCreationInclude(group)));
+            exclusionPattern = Pattern.compile(String.join("|", config.topicCreationExclude(group)));
+            numPartitions = config.topicCreationPartitions(group);
+            replicationFactor = config.topicCreationReplicationFactor(group);
+            otherConfigs = config.topicCreationOtherConfigs(group);
+        }
+
+        public boolean matches(String topic) {
+            return !exclusionPattern.matcher(topic).matches() && inclusionPattern.matcher(topic).matches();
+        }
+
+        public NewTopic newTopic(String topic) {
+            NewTopicBuilder builder = new NewTopicBuilder(topic);
+            return builder.partitions(numPartitions)
+                    .replicationFactor(replicationFactor)
+                    .config(otherConfigs)
+                    .build();
+        }

Review comment:
       Added. 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.DEFAULT_NUM_BROKERS;
+
+/**
+ * Integration test for the endpoints that offer topic tracking of a connector's active
+ * topics.
+ */
+@Category(IntegrationTest.class)
+public class SourceConnectorsIntegrationTest {
+
+    private static final int NUM_WORKERS = 3;
+    private static final int NUM_TASKS = 1;
+    private static final String FOO_TOPIC = "foo-topic";
+    private static final String FOO_CONNECTOR = "foo-source";
+    private static final String BAR_TOPIC = "bar-topic";
+    private static final String BAR_CONNECTOR = "bar-source";
+    private static final String FOO_GROUP = "foo";
+    private static final String BAR_GROUP = "bar";
+    private static final int DEFAULT_REPLICATION_FACTOR = DEFAULT_NUM_BROKERS;
+    private static final int DEFAULT_PARTITIONS = 1;
+    private static final int FOO_GROUP_REPLICATION_FACTOR = DEFAULT_NUM_BROKERS;
+    private static final int FOO_GROUP_PARTITIONS = 9;
+
+    private EmbeddedConnectCluster.Builder connectBuilder;
+    private EmbeddedConnectCluster connect;
+    Map<String, String> workerProps = new HashMap<>();
+    Properties brokerProps = new Properties();
+
+    @Before
+    public void setup() {
+        // setup Connect worker properties
+        workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
+
+        // setup Kafka broker properties
+        brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+        // build a Connect cluster backed by Kafka and Zk
+        connectBuilder = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .workerProps(workerProps)
+                .brokerProps(brokerProps)
+                .maskExitProcedures(true); // true is the default, setting here as example
+    }
+
+    @After
+    public void close() {
+        // stop all Connect, Kafka and Zk threads.
+        connect.stop();
+    }
+
+    @Test
+    public void testCreateTopic() throws InterruptedException {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
+
+        Map<String, String> fooProps = sourceConnectorPropsWithGroups(FOO_TOPIC);
+
+        // start a source connector
+        connect.configureConnector(FOO_CONNECTOR, fooProps);
+        fooProps.put(NAME_CONFIG, FOO_CONNECTOR);
+
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(fooProps.get(CONNECTOR_CLASS_CONFIG), fooProps, 0,
+                "Validating connector configuration produced an unexpected number or errors.");
+
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        connect.assertions().assertTopicsExist(FOO_TOPIC);
+        connect.assertions().assertTopicSettings(FOO_TOPIC, FOO_GROUP_REPLICATION_FACTOR, FOO_GROUP_PARTITIONS);
+    }
+
+    @Test
+    public void testSwitchingToTopicCreationEnabled() throws InterruptedException {

Review comment:
       Not a bad idea. Good thing is that this type of test runs fast. Added.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +417,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!isTopicCreationEnabled || topicCache.contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCache.add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = topicGroups.values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(defaultTopicGroup);
+        log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);

Review comment:
       👍 Added

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -802,6 +826,16 @@ public String workerId() {
         return workerId;
     }
 
+    /**
+     * Returns whether this worker is configured to allow source connectors to create the topics
+     * that they use with custom configurations, if these topics don't already exist.
+     *
+     * @return true if topic creation by source connectors is allowed; false otherwise
+     */
+    public boolean isTopicCreationEnabled() {
+        return config.getBoolean(TOPIC_CREATION_ENABLE_CONFIG);

Review comment:
       All I can think of is that I added this one here first, thinking I might not need a similar method in the config. Fixed. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = "topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that represent regular "
+            + "expressions that may match topic names. This list is used to include topics that "
+            + "match their values and apply this group's specific configuration to the topics "
+            + "that match this inclusion list. $alias applies to any group defined in topic"
+            + ".creation.groups but not the default";

Review comment:
       Good point. We could always substitute with `String.format` but since it doesn't add much, I've removed it here and below. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +159,18 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.isTopicCreationEnabled =
+                workerConfig.getBoolean(TOPIC_CREATION_ENABLE_CONFIG) && topicGroups != null;
+        if (isTopicCreationEnabled) {
+            this.defaultTopicGroup = topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP);
+            this.topicGroups = new LinkedHashMap<>(topicGroups);
+            this.topicGroups.remove(DEFAULT_TOPIC_CREATION_GROUP);
+            this.topicCache = new HashSet<>();
+        } else {
+            this.defaultTopicGroup = null;
+            this.topicGroups = Collections.emptyMap();
+            this.topicCache = Collections.emptySet();
+        }

Review comment:
       Good point. Refactored and added a few simple tests. 
   This class can be used for more assertions eventually, but this will require a refactor of the constructor arguments, that I'd suggest addressing separately.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.runtime.SourceConnectorConfig;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.DEFAULT_NUM_BROKERS;
+
+/**
+ * Integration test for the endpoints that offer topic tracking of a connector's active
+ * topics.
+ */
+@Category(IntegrationTest.class)
+public class SourceConnectorsIntegrationTest {
+
+    private static final int NUM_WORKERS = 3;
+    private static final int NUM_TASKS = 1;
+    private static final String FOO_TOPIC = "foo-topic";
+    private static final String FOO_CONNECTOR = "foo-source";
+    private static final String BAR_TOPIC = "bar-topic";
+    private static final String BAR_CONNECTOR = "bar-source";
+    private static final String FOO_GROUP = "foo";
+    private static final String BAR_GROUP = "bar";
+    private static final int DEFAULT_REPLICATION_FACTOR = DEFAULT_NUM_BROKERS;
+    private static final int DEFAULT_PARTITIONS = 1;
+    private static final int FOO_GROUP_REPLICATION_FACTOR = DEFAULT_NUM_BROKERS;
+    private static final int FOO_GROUP_PARTITIONS = 9;
+
+    private EmbeddedConnectCluster.Builder connectBuilder;
+    private EmbeddedConnectCluster connect;
+    Map<String, String> workerProps = new HashMap<>();
+    Properties brokerProps = new Properties();
+
+    @Before
+    public void setup() {
+        // setup Connect worker properties
+        workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
+
+        // setup Kafka broker properties
+        brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+        // build a Connect cluster backed by Kafka and Zk
+        connectBuilder = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .workerProps(workerProps)
+                .brokerProps(brokerProps)
+                .maskExitProcedures(true); // true is the default, setting here as example
+    }
+
+    @After
+    public void close() {
+        // stop all Connect, Kafka and Zk threads.
+        connect.stop();
+    }
+
+    @Test
+    public void testCreateTopic() throws InterruptedException {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
+
+        Map<String, String> fooProps = sourceConnectorPropsWithGroups(FOO_TOPIC);
+
+        // start a source connector
+        connect.configureConnector(FOO_CONNECTOR, fooProps);
+        fooProps.put(NAME_CONFIG, FOO_CONNECTOR);
+
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(fooProps.get(CONNECTOR_CLASS_CONFIG), fooProps, 0,
+                "Validating connector configuration produced an unexpected number or errors.");
+
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        connect.assertions().assertTopicsExist(FOO_TOPIC);
+        connect.assertions().assertTopicSettings(FOO_TOPIC, FOO_GROUP_REPLICATION_FACTOR, FOO_GROUP_PARTITIONS);
+    }
+
+    @Test
+    public void testSwitchingToTopicCreationEnabled() throws InterruptedException {
+        workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(false));
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.kafka().createTopic(BAR_TOPIC, 1);
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");

Review comment:
       Added that assertion and I also changed `createTopic` to pass specific properties. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +417,40 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!isTopicCreationEnabled || topicCache.contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCache.add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = topicGroups.values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(defaultTopicGroup);
+        log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);
+        NewTopic newTopic = topicGroup.newTopic(topic);
+
+        if (admin.createTopic(newTopic)) {
+            topicCache.add(topic);
+            log.info("Created topic '{}'", newTopic);

Review comment:
       👍 went with option 2. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +191,47 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig config) {
+            inclusionPattern = Pattern.compile(String.join("|", config.topicCreationInclude(group)));
+            exclusionPattern = Pattern.compile(String.join("|", config.topicCreationExclude(group)));

Review comment:
       Indeed and I think that if something is permitted by a definition that is not expected to change any time soon we should be free to use it. 
   
   The good news with the current approach is that if you want to encode your regex as a single pattern, this config with accept it _as is_. Given that, I'd prefer to keep it as a list since it might be slightly simpler to write or extend while still being able to use a single regex without change if you prefer. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#discussion_r429666897



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,153 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, "Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY);
     }
 
-    public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }

Review comment:
       Very true. I've been adding tests there in the meantime and I've decided to put the tests for topic groups there as well. I just pushed the commit with several tests. Will add a few more for multiple groups (for now the tests exercise the default or one group plus the default). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine edited a comment on pull request #8722: KAFKA-5295: Allow source connectors to specify topic-specific settings for new topics (KIP-158)

Posted by GitBox <gi...@apache.org>.
kkonstantine edited a comment on pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#issuecomment-633282441


   Thanks for your comments @rhauch !
   I'll be addressing them shortly. I just added unit tests for the source connector configs and I'll be adding a few more plus the integration tests that you mentioned. 
   
   I'll ping you here when it's ready for another pass. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org