You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/17 12:31:41 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

C0urante commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r874752916


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1000,6 +1090,266 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task = null;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = plugins.connectorClass(
+                    connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = baseConsumerConfigs(
+                    id.connector(),  "connector-consumer-" + id, config, connectorConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK);
+            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+            TopicAdmin topicAdmin;
+            Map<String, TopicCreationGroup> topicCreationGroups;
+            if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
+                // Create a topic admin that the task can use for topic creation
+                Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                topicAdmin = new TopicAdmin(adminOverrides);
+            } else {
+                topicAdmin = null;
+                topicCreationGroups = null;
+            }
+
+            // Set up the offset backing store for this task instance
+            ConnectorOffsetBackingStore offsetStore = new ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");

Review Comment:
   Yes; per-connector offsets topics are implemented in https://github.com/apache/kafka/pull/11781.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org