You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/08 01:28:29 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

hachikuji commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r891835221


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka, with support for exactly-once delivery guarantees.
+ */
+class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
+    private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
+
+    private boolean transactionOpen;
+    private final LinkedHashMap<SourceRecord, RecordMetadata> commitableRecords;
+
+    private final TransactionManager transactionManager;
+    private final TransactionMetricsGroup transactionMetrics;
+
+    private final ConnectorOffsetBackingStore offsetBackingStore;
+    private final Runnable preProducerCheck;
+    private final Runnable postProducerCheck;
+
+    public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
+                                       SourceTask task,
+                                       TaskStatus.Listener statusListener,
+                                       TargetState initialState,
+                                       Converter keyConverter,
+                                       Converter valueConverter,
+                                       HeaderConverter headerConverter,
+                                       TransformationChain<SourceRecord> transformationChain,
+                                       Producer<byte[], byte[]> producer,
+                                       TopicAdmin admin,
+                                       Map<String, TopicCreationGroup> topicGroups,
+                                       CloseableOffsetStorageReader offsetReader,
+                                       OffsetStorageWriter offsetWriter,
+                                       ConnectorOffsetBackingStore offsetBackingStore,
+                                       WorkerConfig workerConfig,
+                                       ClusterConfigState configState,
+                                       ConnectMetrics connectMetrics,
+                                       ClassLoader loader,
+                                       Time time,
+                                       RetryWithToleranceOperator retryWithToleranceOperator,
+                                       StatusBackingStore statusBackingStore,
+                                       SourceConnectorConfig sourceConfig,
+                                       Executor closeExecutor,
+                                       Runnable preProducerCheck,
+                                       Runnable postProducerCheck) {
+        super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain,
+                new WorkerSourceTaskContext(offsetReader, id, configState, buildTransactionContext(sourceConfig)),
+                producer, admin, topicGroups, offsetReader, offsetWriter, offsetBackingStore, workerConfig, connectMetrics,
+                loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
+
+        this.transactionOpen = false;
+        this.commitableRecords = new LinkedHashMap<>();
+        this.offsetBackingStore = offsetBackingStore;
+
+        this.preProducerCheck = preProducerCheck;
+        this.postProducerCheck = postProducerCheck;
+
+        this.transactionManager = buildTransactionManager(workerConfig, sourceConfig, sourceTaskContext.transactionContext());
+        this.transactionMetrics = new TransactionMetricsGroup(id, connectMetrics);
+    }
+
+    private static WorkerTransactionContext buildTransactionContext(SourceConnectorConfig sourceConfig) {
+        return TransactionBoundary.CONNECTOR.equals(sourceConfig.transactionBoundary())
+                ? new WorkerTransactionContext()
+                : null;
+    }
+
+    @Override
+    protected void prepareToInitializeTask() {
+        preProducerCheck.run();
+
+        // Try not to start up the offset store (which has its own producer and consumer) if we've already been shut down at this point
+        if (isStopping())
+            return;
+        offsetBackingStore.start();
+
+        // Try not to initialize the transactional producer (which may accidentally fence out other, later task generations) if we've already
+        // been shut down at this point
+        if (isStopping())
+            return;
+        producer.initTransactions();
+
+        postProducerCheck.run();
+    }
+
+    @Override
+    protected void prepareToEnterSendLoop() {
+        transactionManager.initialize();
+    }
+
+    @Override
+    protected void beginSendIteration() {
+        // No-op
+    }
+
+    @Override
+    protected void prepareToPollTask() {
+        // No-op
+    }
+
+    @Override
+    protected void recordDropped(SourceRecord record) {
+        synchronized (this) {
+            commitableRecords.put(record, null);
+        }
+        transactionManager.maybeCommitTransactionForRecord(record);
+    }
+
+    @Override
+    protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(
+            SourceRecord sourceRecord,
+            ProducerRecord<byte[], byte[]> producerRecord
+    ) {
+        if (offsetBackingStore.primaryOffsetsTopic().equals(producerRecord.topic())) {
+            // This is to prevent deadlock that occurs when:
+            //     1. A task provides a record whose topic is the task's offsets topic
+            //     2. That record is dispatched to the task's producer in a transaction that remains open
+            //        at least until the worker polls the task again
+            //     3. In the subsequent call to SourceTask::poll, the task requests offsets from the worker
+            //        (which requires a read to the end of the offsets topic, and will block until any open
+            //        transactions on the topic are either committed or aborted)
+            throw new ConnectException("Source tasks may not produce to their own offsets topics when exactly-once support is enabled");
+        }
+        maybeBeginTransaction();
+        return Optional.empty();
+    }
+
+    @Override
+    protected void recordDispatched(SourceRecord record) {
+        // Offsets are converted & serialized in the OffsetWriter
+        // Important: we only save offsets for the record after it has been accepted by the producer; this way,
+        // we commit those offsets if and only if the record is sent successfully.
+        offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
+        transactionMetrics.addRecord();
+        transactionManager.maybeCommitTransactionForRecord(record);
+    }
+
+    @Override
+    protected void batchDispatched() {
+        transactionManager.maybeCommitTransactionForBatch();
+    }
+
+    @Override
+    protected void recordSent(
+            SourceRecord sourceRecord,
+            ProducerRecord<byte[], byte[]> producerRecord,
+            RecordMetadata recordMetadata
+    ) {
+        synchronized (this) {
+            commitableRecords.put(sourceRecord, recordMetadata);
+        }
+    }
+
+    @Override
+    protected void producerSendFailed(
+            boolean synchronous,
+            ProducerRecord<byte[], byte[]> producerRecord,
+            SourceRecord preTransformRecord,
+            Exception e
+    ) {
+        if (synchronous) {
+            throw maybeWrapProducerSendException(
+                    "Unrecoverable exception trying to send",
+                    e
+            );
+        } else {
+            // No-op; all asynchronously-reported producer exceptions should be bubbled up again by Producer::commitTransaction
+        }
+    }
+
+    @Override
+    protected void finalOffsetCommit(boolean failed) {
+        if (failed) {
+            log.debug("Skipping final offset commit as task has failed");
+            return;
+        }
+
+        // It should be safe to commit here even if we were in the middle of retrying on RetriableExceptions in the
+        // send loop since we only track source offsets for records that have been successfully dispatched to the
+        // producer.
+        // Any records that we were retrying on (and any records after them in the batch) won't be included in the
+        // transaction and their offsets won't be committed, but (unless the user has requested connector-defined
+        // transaction boundaries), it's better to commit some data than none.
+        transactionManager.maybeCommitFinalTransaction();
+    }
+
+    @Override
+    protected void onPause() {
+        super.onPause();
+        // Commit the transaction now so that we don't end up with a hanging transaction, or worse, get fenced out
+        // and fail the task once unpaused
+        transactionManager.maybeCommitFinalTransaction();
+    }
+
+    private void maybeBeginTransaction() {
+        if (!transactionOpen) {
+            producer.beginTransaction();
+            transactionOpen = true;
+        }
+    }
+
+    private void commitTransaction() {
+        log.debug("{} Committing offsets", this);
+
+        long started = time.milliseconds();
+
+        // We might have just aborted a transaction, in which case we'll have to begin a new one
+        // in order to commit offsets
+        maybeBeginTransaction();
+
+        AtomicReference<Throwable> flushError = new AtomicReference<>();
+        Future<Void> offsetFlush = null;
+        if (offsetWriter.beginFlush()) {
+            // Now we can actually write the offsets to the internal topic.
+            offsetFlush = offsetWriter.doFlush((error, result) -> {
+                if (error != null) {
+                    log.error("{} Failed to flush offsets to storage: ", ExactlyOnceWorkerSourceTask.this, error);
+                    flushError.compareAndSet(null, error);
+                } else {
+                    log.trace("{} Finished flushing offsets to storage", ExactlyOnceWorkerSourceTask.this);
+                }
+            });
+        }
+
+        // Commit the transaction
+        // Blocks until all outstanding records have been sent and ack'd
+        try {
+            producer.commitTransaction();
+            if (offsetFlush != null) {
+                // Although it's guaranteed by the above call to Producer::commitTransaction that all outstanding
+                // records for the task's producer (including those sent to the offsets topic) have been delivered and
+                // ack'd, there is no guarantee that the producer callbacks for those records have been completed. So,
+                // we add this call to Future::get to ensure that these callbacks are invoked successfully before
+                // proceeding.
+                offsetFlush.get();
+            }
+        } catch (Throwable t) {
+            flushError.compareAndSet(null, t);
+        }
+
+        transactionOpen = false;
+
+        Throwable error = flushError.get();
+        if (error != null) {
+            recordCommitFailure(time.milliseconds() - started, null);
+            offsetWriter.cancelFlush();
+            throw maybeWrapProducerSendException(
+                    "Failed to flush offsets and/or records for task " + id,
+                    error
+            );
+        }
+
+        transactionMetrics.commitTransaction();
+
+        long durationMillis = time.milliseconds() - started;
+        recordCommitSuccess(durationMillis);
+        log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis);
+
+        // No need for any synchronization here; all other accesses to this field take place in producer callbacks,
+        // which should all be completed by this point

Review Comment:
   Yeah, I think it's a documentation gap. It is intended for callbacks to be invoked before `EndTxn` is sent. It would be a little strange otherwise. Worth noting that the callback is just for notification. Even if the callback raises an exception, we won't stop an active commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org