You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/17 11:24:01 UTC

[GitHub] [kafka] mimaison commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

mimaison commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r874678140


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -271,8 +278,10 @@ void doShutdown() {
             state = State.FAILED;
             statusListener.onFailure(connName, t);
         } finally {
-            ctx.close();
-            metrics.close();
+            Utils.closeQuietly(ctx, "Connector context for " + connName);
+            Utils.closeQuietly(metrics, "Connector metrics for " + connName);
+            Utils.closeQuietly(offsetStorageReader, "Offset reader for " + connName);
+            Utils.closeQuietly(offsetStore::stop, "offset backing store for " + connName);

Review Comment:
   Nit: All other messages are capitalized



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##########
@@ -229,4 +229,4 @@ private void expectStore(Map<String, Object> key, byte[] keySerialized,
         });
     }
 
-}
+}

Review Comment:
   Can we undo this?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -576,88 +672,42 @@ public boolean startTask(
 
             executor.submit(workerTask);
             if (workerTask instanceof WorkerSourceTask) {
-                sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
+                sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, (WorkerSourceTask) workerTask));
             }
             return true;
         }
     }
 
-    private WorkerTask buildWorkerTask(ClusterConfigState configState,
-                                       ConnectorConfig connConfig,
-                                       ConnectorTaskId id,
-                                       Task task,
-                                       TaskStatus.Listener statusListener,
-                                       TargetState initialState,
-                                       Converter keyConverter,
-                                       Converter valueConverter,
-                                       HeaderConverter headerConverter,
-                                       ClassLoader loader) {
-        ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
-        final Class<? extends Connector> connectorClass = plugins.connectorClass(
-            connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
-        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
-                connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
-        // Decide which type of worker task we need based on the type of task.
-        if (task instanceof SourceTask) {
-            SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
-                    connConfig.originalsStrings(), config.topicCreationEnable());
-            retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
-            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            Map<String, Object> producerProps = producerConfigs(id, "connector-producer-" + id, config, sourceConfig, connectorClass,
-                                                                connectorClientConfigOverridePolicy, kafkaClusterId);
-            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
-            TopicAdmin admin;
-            Map<String, TopicCreationGroup> topicCreationGroups;
-            if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
-                Map<String, Object> adminProps = adminConfigs(id, "connector-adminclient-" + id, config,
-                        sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
-                admin = new TopicAdmin(adminProps);
-                topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
-            } else {
-                admin = null;
-                topicCreationGroups = null;
-            }
-
-            // Note we pass the configState as it performs dynamic transformations under the covers
-            return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, admin, topicCreationGroups,
-                    offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore(), executor);
-        } else if (task instanceof SinkTask) {
-            TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
-            WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
-                    keyConverter, valueConverter, headerConverter);
-
-            Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
-            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
-
-            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
-                                      valueConverter, headerConverter, transformationChain, consumer, loader, time,
-                                      retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
-        } else {
-            log.error("Tasks must be a subclass of either SourceTask or SinkTask and current is {}", task);
-            throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
-        }
+    static Map<String, Object> exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId id,
+                                                              WorkerConfig config,
+                                                              ConnectorConfig connConfig,
+                                                              Class<? extends Connector>  connectorClass,
+                                                              ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                                              String clusterId) {
+        Map<String, Object> result = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, clusterId);
+        ConnectUtils.ensureProperty(
+                result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
+                "for connectors when exactly-once source support is enabled",
+                false
+        );
+        String transactionalId = transactionalId(config.groupId(), id.connector(), id.task());

Review Comment:
   Should we use `DistributedConfig.transactionalProducerId()` here?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1000,6 +1090,266 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task = null;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = plugins.connectorClass(
+                    connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = baseConsumerConfigs(
+                    id.connector(),  "connector-consumer-" + id, config, connectorConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK);
+            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+            TopicAdmin topicAdmin;
+            Map<String, TopicCreationGroup> topicCreationGroups;
+            if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
+                // Create a topic admin that the task can use for topic creation
+                Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                topicAdmin = new TopicAdmin(adminOverrides);
+            } else {
+                topicAdmin = null;
+                topicCreationGroups = null;
+            }
+
+            // Set up the offset backing store for this task instance
+            ConnectorOffsetBackingStore offsetStore = new ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");

Review Comment:
   Are you planning to address this `TODO` in one of the other PRs? Same below in `ExactlyOnceSourceTaskBuilder`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java:
##########
@@ -597,101 +323,4 @@ public String toString() {
                 '}';
     }
 
-    protected void recordPollReturned(int numRecordsInBatch, long duration) {
-        sourceTaskMetricsGroup.recordPoll(numRecordsInBatch, duration);
-    }
-
-    SourceTaskMetricsGroup sourceTaskMetricsGroup() {
-        return sourceTaskMetricsGroup;
-    }
-
-    static class SourceRecordWriteCounter {
-        private final SourceTaskMetricsGroup metricsGroup;
-        private final int batchSize;
-        private boolean completed = false;
-        private int counter;
-        public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) {
-            assert batchSize > 0;
-            assert metricsGroup != null;
-            this.batchSize = batchSize;
-            counter = batchSize;
-            this.metricsGroup = metricsGroup;
-        }
-        public void skipRecord() {
-            if (counter > 0 && --counter == 0) {
-                finishedAllWrites();
-            }
-        }
-        public void completeRecord() {
-            if (counter > 0 && --counter == 0) {
-                finishedAllWrites();
-            }
-        }
-        public void retryRemaining() {
-            finishedAllWrites();
-        }
-        private void finishedAllWrites() {
-            if (!completed) {
-                metricsGroup.recordWrite(batchSize - counter);
-                completed = true;
-            }
-        }
-    }
-
-    static class SourceTaskMetricsGroup {
-        private final MetricGroup metricGroup;
-        private final Sensor sourceRecordPoll;
-        private final Sensor sourceRecordWrite;
-        private final Sensor sourceRecordActiveCount;
-        private final Sensor pollTime;
-        private int activeRecordCount;
-
-        public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
-            ConnectMetricsRegistry registry = connectMetrics.registry();
-            metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
-                    registry.connectorTagName(), id.connector(),
-                    registry.taskTagName(), Integer.toString(id.task()));
-            // remove any previously created metrics in this group to prevent collisions.
-            metricGroup.close();
-
-            sourceRecordPoll = metricGroup.sensor("source-record-poll");
-            sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
-            sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal), new CumulativeSum());
-
-            sourceRecordWrite = metricGroup.sensor("source-record-write");
-            sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteRate), new Rate());
-            sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal), new CumulativeSum());
-
-            pollTime = metricGroup.sensor("poll-batch-time");
-            pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());
-            pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new Avg());
-
-            sourceRecordActiveCount = metricGroup.sensor("source-record-active-count");
-            sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCount), new Value());
-            sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountMax), new Max());
-            sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountAvg), new Avg());
-        }
-
-        void close() {
-            metricGroup.close();
-        }
-
-        void recordPoll(int batchSize, long duration) {
-            sourceRecordPoll.record(batchSize);
-            pollTime.record(duration);
-            activeRecordCount += batchSize;
-            sourceRecordActiveCount.record(activeRecordCount);
-        }
-
-        void recordWrite(int recordCount) {
-            sourceRecordWrite.record(recordCount);
-            activeRecordCount -= recordCount;
-            activeRecordCount = Math.max(0, activeRecordCount);
-            sourceRecordActiveCount.record(activeRecordCount);
-        }
-
-        protected MetricGroup metricGroup() {
-            return metricGroup;
-        }
-    }
-}
+}

Review Comment:
   Nit: Let's keep the new line



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1000,6 +1090,266 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task = null;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = plugins.connectorClass(
+                    connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = baseConsumerConfigs(
+                    id.connector(),  "connector-consumer-" + id, config, connectorConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK);
+            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+            TopicAdmin topicAdmin;
+            Map<String, TopicCreationGroup> topicCreationGroups;
+            if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
+                // Create a topic admin that the task can use for topic creation
+                Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                topicAdmin = new TopicAdmin(adminOverrides);
+            } else {
+                topicAdmin = null;
+                topicCreationGroups = null;
+            }
+
+            // Set up the offset backing store for this task instance
+            ConnectorOffsetBackingStore offsetStore = new ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");
+            offsetStore.configure(config);
+
+            CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter);
+            OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter);
+
+            // Note we pass the configState as it performs dynamic transformations under the covers
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
+                    headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups,
+                    offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time,
+                    retryWithToleranceOperator, herder.statusBackingStore(), executor);
+        }
+    }
+
+    class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
+        private final Runnable preProducerCheck;
+        private final Runnable postProducerCheck;
+
+        public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id,
+                                            ClusterConfigState configState,
+                                            TaskStatus.Listener statusListener,
+                                            TargetState initialState,
+                                            Runnable preProducerCheck,
+                                            Runnable postProducerCheck) {
+            super(id, configState, statusListener, initialState);
+            this.preProducerCheck = preProducerCheck;
+            this.postProducerCheck = postProducerCheck;
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                                  ConnectorTaskId id,
+                                  ClusterConfigState configState,
+                                  TaskStatus.Listener statusListener,
+                                  TargetState initialState,
+                                  ConnectorConfig connectorConfig,
+                                  Converter keyConverter,
+                                  Converter valueConverter,
+                                  HeaderConverter headerConverter,
+                                  ClassLoader classLoader,
+                                  ErrorHandlingMetrics errorHandlingMetrics,
+                                  Class<? extends Connector> connectorClass,
+                                  RetryWithToleranceOperator retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            // Create a topic admin that the task will use for its offsets topic and, potentially, automatic topic creation
+            Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                    sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+            TopicAdmin topicAdmin = new TopicAdmin(adminOverrides);
+
+            Map<String, TopicCreationGroup> topicCreationGroups;
+            if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
+            } else {
+                topicCreationGroups = null;
+            }
+
+            Map<String, Object> producerProps = exactlyOnceSourceTaskProducerConfigs(

Review Comment:
   Nit: can we move this block above the admin block above? so it follows the same flow than `SourceTaskBuilder.doBuild()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org