You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/02 14:34:55 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

C0urante commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r888026291


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -576,88 +672,42 @@ public boolean startTask(
 
             executor.submit(workerTask);
             if (workerTask instanceof WorkerSourceTask) {
-                sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
+                sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, (WorkerSourceTask) workerTask));
             }
             return true;
         }
     }
 
-    private WorkerTask buildWorkerTask(ClusterConfigState configState,
-                                       ConnectorConfig connConfig,
-                                       ConnectorTaskId id,
-                                       Task task,
-                                       TaskStatus.Listener statusListener,
-                                       TargetState initialState,
-                                       Converter keyConverter,
-                                       Converter valueConverter,
-                                       HeaderConverter headerConverter,
-                                       ClassLoader loader) {
-        ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
-        final Class<? extends Connector> connectorClass = plugins.connectorClass(
-            connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
-        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
-                connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
-        // Decide which type of worker task we need based on the type of task.
-        if (task instanceof SourceTask) {
-            SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
-                    connConfig.originalsStrings(), config.topicCreationEnable());
-            retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
-            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            Map<String, Object> producerProps = producerConfigs(id, "connector-producer-" + id, config, sourceConfig, connectorClass,
-                                                                connectorClientConfigOverridePolicy, kafkaClusterId);
-            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
-            TopicAdmin admin;
-            Map<String, TopicCreationGroup> topicCreationGroups;
-            if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
-                Map<String, Object> adminProps = adminConfigs(id, "connector-adminclient-" + id, config,
-                        sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
-                admin = new TopicAdmin(adminProps);
-                topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
-            } else {
-                admin = null;
-                topicCreationGroups = null;
-            }
-
-            // Note we pass the configState as it performs dynamic transformations under the covers
-            return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, admin, topicCreationGroups,
-                    offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore(), executor);
-        } else if (task instanceof SinkTask) {
-            TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
-            log.info("Initializing: {}", transformationChain);
-            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
-            WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
-                    keyConverter, valueConverter, headerConverter);
-
-            Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
-            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
-
-            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
-                                      valueConverter, headerConverter, transformationChain, consumer, loader, time,
-                                      retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
-        } else {
-            log.error("Tasks must be a subclass of either SourceTask or SinkTask and current is {}", task);
-            throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
-        }
+    static Map<String, Object> exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId id,
+                                                              WorkerConfig config,
+                                                              ConnectorConfig connConfig,
+                                                              Class<? extends Connector>  connectorClass,
+                                                              ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                                              String clusterId) {
+        Map<String, Object> result = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, clusterId);
+        ConnectUtils.ensureProperty(
+                result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
+                "for connectors when exactly-once source support is enabled",
+                false
+        );
+        String transactionalId = transactionalId(config.groupId(), id.connector(), id.task());

Review Comment:
   👍 added a comment and updated the method name.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka, with support for exactly-once delivery guarantees.
+ */
+class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
+    private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
+
+    private boolean transactionOpen;
+    private final LinkedHashMap<SourceRecord, RecordMetadata> commitableRecords;
+
+    private final TransactionManager transactionManager;
+    private final TransactionMetricsGroup transactionMetrics;
+
+    private final ConnectorOffsetBackingStore offsetBackingStore;
+    private final Runnable preProducerCheck;
+    private final Runnable postProducerCheck;
+
+    public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
+                                       SourceTask task,
+                                       TaskStatus.Listener statusListener,
+                                       TargetState initialState,
+                                       Converter keyConverter,
+                                       Converter valueConverter,
+                                       HeaderConverter headerConverter,
+                                       TransformationChain<SourceRecord> transformationChain,
+                                       Producer<byte[], byte[]> producer,
+                                       TopicAdmin admin,
+                                       Map<String, TopicCreationGroup> topicGroups,
+                                       CloseableOffsetStorageReader offsetReader,
+                                       OffsetStorageWriter offsetWriter,
+                                       ConnectorOffsetBackingStore offsetBackingStore,
+                                       WorkerConfig workerConfig,
+                                       ClusterConfigState configState,
+                                       ConnectMetrics connectMetrics,
+                                       ClassLoader loader,
+                                       Time time,
+                                       RetryWithToleranceOperator retryWithToleranceOperator,
+                                       StatusBackingStore statusBackingStore,
+                                       SourceConnectorConfig sourceConfig,
+                                       Executor closeExecutor,
+                                       Runnable preProducerCheck,
+                                       Runnable postProducerCheck) {
+        super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain,
+                new WorkerSourceTaskContext(offsetReader, id, configState, buildTransactionContext(sourceConfig)),
+                producer, admin, topicGroups, offsetReader, offsetWriter, offsetBackingStore, workerConfig, connectMetrics,
+                loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
+
+        this.transactionOpen = false;
+        this.commitableRecords = new LinkedHashMap<>();
+        this.offsetBackingStore = offsetBackingStore;
+
+        this.preProducerCheck = preProducerCheck;
+        this.postProducerCheck = postProducerCheck;
+
+        this.transactionManager = buildTransactionManager(workerConfig, sourceConfig, sourceTaskContext.transactionContext());
+        this.transactionMetrics = new TransactionMetricsGroup(id, connectMetrics);
+    }
+
+    private static WorkerTransactionContext buildTransactionContext(SourceConnectorConfig sourceConfig) {
+        return TransactionBoundary.CONNECTOR.equals(sourceConfig.transactionBoundary())
+                ? new WorkerTransactionContext()
+                : null;
+    }
+
+    @Override
+    protected void prepareToInitializeTask() {
+        preProducerCheck.run();
+
+        // Try not to start up the offset store (which has its own producer and consumer) if we've already been shut down at this point
+        if (isStopping())
+            return;
+        offsetBackingStore.start();
+
+        // Try not to initialize the transactional producer (which may accidentally fence out other, later task generations) if we've already
+        // been shut down at this point
+        if (isStopping())
+            return;
+        producer.initTransactions();
+
+        postProducerCheck.run();
+    }
+
+    @Override
+    protected void prepareToEnterSendLoop() {
+        transactionManager.initialize();
+    }
+
+    @Override
+    protected void beginSendIteration() {
+        // No-op
+    }
+
+    @Override
+    protected void prepareToPollTask() {
+        // No-op
+    }
+
+    @Override
+    protected void recordDropped(SourceRecord record) {
+        synchronized (this) {
+            commitableRecords.put(record, null);
+        }
+        transactionManager.maybeCommitTransactionForRecord(record);
+    }
+
+    @Override
+    protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(
+            SourceRecord sourceRecord,
+            ProducerRecord<byte[], byte[]> producerRecord
+    ) {
+        if (offsetBackingStore.primaryOffsetsTopic().equals(producerRecord.topic())) {
+            // This is to prevent deadlock that occurs when:
+            //     1. A task provides a record whose topic is the task's offsets topic
+            //     2. That record is dispatched to the task's producer in a transaction that remains open
+            //        at least until the worker polls the task again
+            //     3. In the subsequent call to SourceTask::poll, the task requests offsets from the worker
+            //        (which requires a read to the end of the offsets topic, and will block until any open
+            //        transactions on the topic are either committed or aborted)
+            throw new ConnectException("Source tasks may not produce to their own offsets topics when exactly-once support is enabled");
+        }
+        maybeBeginTransaction();
+        return Optional.empty();
+    }
+
+    @Override
+    protected void recordDispatched(SourceRecord record) {
+        // Offsets are converted & serialized in the OffsetWriter
+        // Important: we only save offsets for the record after it has been accepted by the producer; this way,
+        // we commit those offsets if and only if the record is sent successfully.
+        offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
+        transactionMetrics.addRecord();
+        transactionManager.maybeCommitTransactionForRecord(record);
+    }
+
+    @Override
+    protected void batchDispatched() {
+        transactionManager.maybeCommitTransactionForBatch();
+    }
+
+    @Override
+    protected void recordSent(
+            SourceRecord sourceRecord,
+            ProducerRecord<byte[], byte[]> producerRecord,
+            RecordMetadata recordMetadata
+    ) {
+        synchronized (this) {
+            commitableRecords.put(sourceRecord, recordMetadata);
+        }
+    }
+
+    @Override
+    protected void producerSendFailed(
+            boolean synchronous,
+            ProducerRecord<byte[], byte[]> producerRecord,
+            SourceRecord preTransformRecord,
+            Exception e
+    ) {
+        if (synchronous) {
+            throw maybeWrapProducerSendException(
+                    "Unrecoverable exception trying to send",
+                    e
+            );
+        } else {
+            // No-op; all asynchronously-reported producer exceptions should be bubbled up again by Producer::commitTransaction
+        }
+    }
+
+    @Override
+    protected void finalOffsetCommit(boolean failed) {
+        if (failed) {
+            log.debug("Skipping final offset commit as task has failed");
+            return;
+        }
+
+        // It should be safe to commit here even if we were in the middle of retrying on RetriableExceptions in the
+        // send loop since we only track source offsets for records that have been successfully dispatched to the
+        // producer.
+        // Any records that we were retrying on (and any records after them in the batch) won't be included in the
+        // transaction and their offsets won't be committed, but (unless the user has requested connector-defined
+        // transaction boundaries), it's better to commit some data than none.
+        transactionManager.maybeCommitFinalTransaction();
+    }
+
+    @Override
+    protected void onPause() {
+        super.onPause();
+        // Commit the transaction now so that we don't end up with a hanging transaction, or worse, get fenced out
+        // and fail the task once unpaused
+        transactionManager.maybeCommitFinalTransaction();
+    }
+
+    private void maybeBeginTransaction() {
+        if (!transactionOpen) {
+            producer.beginTransaction();
+            transactionOpen = true;
+        }
+    }
+
+    private void commitTransaction() {
+        log.debug("{} Committing offsets", this);
+
+        long started = time.milliseconds();
+
+        // We might have just aborted a transaction, in which case we'll have to begin a new one
+        // in order to commit offsets
+        maybeBeginTransaction();
+
+        AtomicReference<Throwable> flushError = new AtomicReference<>();
+        Future<Void> offsetFlush = null;
+        if (offsetWriter.beginFlush()) {
+            // Now we can actually write the offsets to the internal topic.
+            offsetFlush = offsetWriter.doFlush((error, result) -> {
+                if (error != null) {
+                    log.error("{} Failed to flush offsets to storage: ", ExactlyOnceWorkerSourceTask.this, error);
+                    flushError.compareAndSet(null, error);
+                } else {
+                    log.trace("{} Finished flushing offsets to storage", ExactlyOnceWorkerSourceTask.this);
+                }
+            });
+        }
+
+        // Commit the transaction
+        // Blocks until all outstanding records have been sent and ack'd
+        try {
+            producer.commitTransaction();
+            if (offsetFlush != null) {
+                // Although it's guaranteed by the above call to Producer::commitTransaction that all outstanding
+                // records for the task's producer (including those sent to the offsets topic) have been delivered and
+                // ack'd, there is no guarantee that the producer callbacks for those records have been completed. So,
+                // we add this call to Future::get to ensure that these callbacks are invoked successfully before
+                // proceeding.
+                offsetFlush.get();
+            }
+        } catch (Throwable t) {
+            flushError.compareAndSet(null, t);
+        }
+
+        transactionOpen = false;
+
+        Throwable error = flushError.get();
+        if (error != null) {
+            recordCommitFailure(time.milliseconds() - started, null);
+            offsetWriter.cancelFlush();
+            throw maybeWrapProducerSendException(
+                    "Failed to flush offsets and/or records for task " + id,
+                    error
+            );
+        }
+
+        transactionMetrics.commitTransaction();
+
+        long durationMillis = time.milliseconds() - started;
+        recordCommitSuccess(durationMillis);
+        log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis);
+
+        // No need for any synchronization here; all other accesses to this field take place in producer callbacks,
+        // which should all be completed by this point

Review Comment:
   That's a fair point. Right now this is a result of the threading model of the producer, which blocks on the completion of all in-flight batches and their callbacks before sending a request to the broker to end a transaction. This is not technically guaranteed by the documented behavior for `Producer::commitTransaction`, though; the only guarantees we get from the [Javadocs](https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction()) are:
   > This method will flush any unsent records before actually committing the transaction. Further, if any of the [send(ProducerRecord)](https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord)) calls which were part of the transaction hit irrecoverable errors, this method will throw the last received exception immediately and the transaction will not be committed.
   
   If we don't want to rely on this technically-undocumented behavior, we can add some more bookkeeping to ensure this ourselves. Alternatively, we can rely on this behavior and I can remove the comment above about producer callback guarantees and the bookkeeping we already do on that front (which I've done for now).
   
   @hachikuji @guozhangwang do you have thoughts about whether it should be part of the contract for `Producer::commitTransaction` that all user-supplied callbacks for records within the transaction are fired before the method returns? The use case is tracking record metadata that's only available after the record is ack'd and then reporting it to source tasks, before clearing a local cache of that metadata.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -784,6 +871,10 @@ private static Map<String, Object> connectorClientConfigOverrides(ConnectorTaskI
         return clientOverrides;
     }
 
+    public static String transactionalId(String groupId, String connector, int taskId) {

Review Comment:
   Ack, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org