You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/06/13 14:25:41 UTC
[kafka] branch trunk updated: KAFKA-10000: Exactly-once source tasks (#11780)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9e8ef8bb31 KAFKA-10000: Exactly-once source tasks (#11780)
9e8ef8bb31 is described below
commit 9e8ef8bb317599c184ce8201d494edf109d9c528
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Mon Jun 13 07:25:29 2022 -0700
KAFKA-10000: Exactly-once source tasks (#11780)
Reviewers: Mickael Maison <mi...@gmail.com>, Tom Bentley <tb...@redhat.com>
---
build.gradle | 3 +-
...urceTask.java => AbstractWorkerSourceTask.java} | 579 ++++-----
.../runtime/ExactlyOnceWorkerSourceTask.java | 522 ++++++++
.../kafka/connect/runtime/SubmittedRecords.java | 64 +-
.../org/apache/kafka/connect/runtime/Worker.java | 558 +++++++--
.../kafka/connect/runtime/WorkerConnector.java | 21 +-
.../kafka/connect/runtime/WorkerSourceTask.java | 565 ++-------
.../connect/runtime/WorkerSourceTaskContext.java | 19 +-
.../apache/kafka/connect/runtime/WorkerTask.java | 2 +-
.../connect/runtime/WorkerTransactionContext.java | 109 ++
.../runtime/distributed/DistributedHerder.java | 58 +-
.../runtime/standalone/StandaloneHerder.java | 32 +-
.../storage/ConnectorOffsetBackingStore.java | 71 ++
.../connect/storage/OffsetStorageReaderImpl.java | 1 +
.../kafka/connect/storage/OffsetStorageWriter.java | 13 +-
.../runtime/AbstractWorkerSourceTaskTest.java | 842 +++++++++++++
.../connect/runtime/ErrorHandlingTaskTest.java | 11 +-
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 1322 ++++++++++++++++++++
.../connect/runtime/SubmittedRecordsTest.java | 12 +-
.../kafka/connect/runtime/WorkerConnectorTest.java | 88 +-
.../connect/runtime/WorkerSourceTaskTest.java | 548 +-------
.../apache/kafka/connect/runtime/WorkerTest.java | 237 +++-
.../runtime/WorkerTransactionContextTest.java | 110 ++
.../runtime/distributed/DistributedHerderTest.java | 42 +-
.../runtime/standalone/StandaloneHerderTest.java | 14 +-
gradle/spotbugs-exclude.xml | 12 +-
26 files changed, 4276 insertions(+), 1579 deletions(-)
diff --git a/build.gradle b/build.gradle
index e34010c166..fd3b58b27e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -407,7 +407,8 @@ subprojects {
"**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*", "**/KafkaOffsetBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*",
"**/SourceTaskOffsetCommitterTest.*", "**/WorkerConfigTransformerTest.*", "**/WorkerGroupMemberTest.*",
- "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*", "**/WorkerSourceTaskTest.*",
+ "**/WorkerTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
+ "**/WorkerSourceTaskTest.*", "**/AbstractWorkerSourceTaskTest.*", "**/ExactlyOnceWorkerSourceTaskTest.*",
"**/WorkerTaskTest.*",
// streams tests
"**/KafkaStreamsTest.*"
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
similarity index 60%
copy from connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
copy to connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 43bf64c6af..3a805bdd7d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -18,7 +18,8 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
@@ -35,15 +36,13 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
-import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
-import org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
-import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
-import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
@@ -59,78 +58,173 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
/**
- * WorkerTask that uses a SourceTask to ingest data into Kafka.
+ * WorkerTask that contains shared logic for running source tasks with either standard or exactly-once delivery guarantees.
*/
-class WorkerSourceTask extends WorkerTask {
- private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
+public abstract class AbstractWorkerSourceTask extends WorkerTask {
+ private static final Logger log = LoggerFactory.getLogger(AbstractWorkerSourceTask.class);
private static final long SEND_FAILED_BACKOFF_MS = 100;
- private final WorkerConfig workerConfig;
+ /**
+ * Hook to define custom startup behavior before the calling {@link SourceTask#initialize(SourceTaskContext)}
+ * and {@link SourceTask#start(Map)}.
+ */
+ protected abstract void prepareToInitializeTask();
+
+ /**
+ * Hook to define custom initialization behavior when preparing to begin the poll-convert-send loop for the first time,
+ * or when re-entering the loop after being paused.
+ */
+ protected abstract void prepareToEnterSendLoop();
+
+ /**
+ * Hook to define custom periodic behavior to be performed at the top of every iteration of the poll-convert-send loop.
+ */
+ protected abstract void beginSendIteration();
+
+ /**
+ * Hook to define custom periodic checks for health, metrics, etc. Called whenever {@link SourceTask#poll()} is about to be invoked.
+ */
+ protected abstract void prepareToPollTask();
+
+ /**
+ * Invoked when a record provided by the task has been filtered out by a transform or the converter,
+ * or will be discarded due to failures during transformation or conversion.
+ * @param record the pre-transform record that has been dropped; never null.
+ */
+ protected abstract void recordDropped(SourceRecord record);
+
+ /**
+ * Invoked when a record is about to be dispatched to the producer. May be invoked multiple times for the same
+ * record if retriable errors are encountered.
+ * @param sourceRecord the pre-transform {@link SourceRecord} provided by the source task; never null.
+ * @param producerRecord the {@link ProducerRecord} produced by transforming and converting the
+ * {@code sourceRecord}; never null;
+ * @return a {@link SubmittedRecords.SubmittedRecord} to be {@link SubmittedRecords.SubmittedRecord#ack() acknowledged}
+ * if the corresponding producer record is ack'd by Kafka or {@link SubmittedRecords.SubmittedRecord#drop() dropped}
+ * if synchronously rejected by the producer. Can also be {@link Optional#empty()} if it is not necessary to track the acknowledgment
+ * of individual producer records
+ */
+ protected abstract Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(
+ SourceRecord sourceRecord,
+ ProducerRecord<byte[], byte[]> producerRecord
+ );
+
+ /**
+ * Invoked when a record has been transformed, converted, and dispatched to the producer successfully via
+ * {@link Producer#send}. Does not guarantee that the record has been sent to Kafka or ack'd by the required number
+ * of brokers, but does guarantee that it will never be re-processed.
+ * @param record the pre-transform {@link SourceRecord} that was successfully dispatched to the producer; never null.
+ */
+ protected abstract void recordDispatched(SourceRecord record);
+
+ /**
+ * Invoked when an entire batch of records returned from {@link SourceTask#poll} has been transformed, converted,
+ * and either discarded due to transform/conversion errors, filtered by a transform, or dispatched to the producer
+ * successfully via {@link Producer#send}. Does not guarantee that the records have been sent to Kafka or ack'd by the
+ * required number of brokers, but does guarantee that none of the records in the batch will ever be re-processed during
+ * the lifetime of this task. At most one record batch is polled from the task in between calls to this method.
+ */
+ protected abstract void batchDispatched();
+
+ /**
+ * Invoked when a record has been sent and ack'd by the Kafka cluster. Note that this method may be invoked
+ * concurrently and should therefore be made thread-safe.
+ * @param sourceRecord the pre-transform {@link SourceRecord} that was successfully sent to Kafka; never null.
+ * @param producerRecord the {@link ProducerRecord} produced by transforming and converting the
+ * {@code sourceRecord}; never null;
+ * @param recordMetadata the {@link RecordMetadata} for the corresponding producer record; never null.
+ */
+ protected abstract void recordSent(
+ SourceRecord sourceRecord,
+ ProducerRecord<byte[], byte[]> producerRecord,
+ RecordMetadata recordMetadata
+ );
+
+ /**
+ * Invoked when a record given to {@link Producer#send(ProducerRecord, Callback)} has failed with a non-retriable error.
+ * @param synchronous whether the error occurred during the invocation of {@link Producer#send(ProducerRecord, Callback)}.
+ * If {@code false}, indicates that the error was reported asynchronously by the producer by a {@link Callback}
+ * @param producerRecord the {@link ProducerRecord} that the producer failed to send; never null
+ * @param preTransformRecord the pre-transform {@link SourceRecord} that the producer record was derived from; never null
+ * @param e the exception that was either thrown from {@link Producer#send(ProducerRecord, Callback)}, or reported by the producer
+ * via {@link Callback} after the call to {@link Producer#send(ProducerRecord, Callback)} completed
+ */
+ protected abstract void producerSendFailed(
+ boolean synchronous,
+ ProducerRecord<byte[], byte[]> producerRecord,
+ SourceRecord preTransformRecord,
+ Exception e
+ );
+
+ /**
+ * Invoked when no more records will be polled from the task or dispatched to the producer. Should attempt to
+ * commit the offsets for any outstanding records when possible.
+ * @param failed whether the task is undergoing a healthy or an unhealthy shutdown
+ */
+ protected abstract void finalOffsetCommit(boolean failed);
+
+
+ protected final WorkerConfig workerConfig;
+ protected final WorkerSourceTaskContext sourceTaskContext;
+ protected final OffsetStorageWriter offsetWriter;
+ protected final Producer<byte[], byte[]> producer;
+
private final SourceTask task;
- private final ClusterConfigState configState;
private final Converter keyConverter;
private final Converter valueConverter;
private final HeaderConverter headerConverter;
private final TransformationChain<SourceRecord> transformationChain;
- private final KafkaProducer<byte[], byte[]> producer;
private final TopicAdmin admin;
private final CloseableOffsetStorageReader offsetReader;
- private final OffsetStorageWriter offsetWriter;
- private final Executor closeExecutor;
+ private final ConnectorOffsetBackingStore offsetStore;
private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
- private final AtomicReference<Exception> producerSendException;
- private final boolean isTopicTrackingEnabled;
- private final TopicCreation topicCreation;
-
- private List<SourceRecord> toSend;
- private volatile CommittableOffsets committableOffsets;
- private final SubmittedRecords submittedRecords;
private final CountDownLatch stopRequestedLatch;
+ private final boolean topicTrackingEnabled;
+ private final TopicCreation topicCreation;
+ private final Executor closeExecutor;
- private Map<String, String> taskConfig;
- private boolean started = false;
-
- public WorkerSourceTask(ConnectorTaskId id,
- SourceTask task,
- TaskStatus.Listener statusListener,
- TargetState initialState,
- Converter keyConverter,
- Converter valueConverter,
- HeaderConverter headerConverter,
- TransformationChain<SourceRecord> transformationChain,
- KafkaProducer<byte[], byte[]> producer,
- TopicAdmin admin,
- Map<String, TopicCreationGroup> topicGroups,
- CloseableOffsetStorageReader offsetReader,
- OffsetStorageWriter offsetWriter,
- WorkerConfig workerConfig,
- ClusterConfigState configState,
- ConnectMetrics connectMetrics,
- ClassLoader loader,
- Time time,
- RetryWithToleranceOperator retryWithToleranceOperator,
- StatusBackingStore statusBackingStore,
- Executor closeExecutor) {
+ // Visible for testing
+ List<SourceRecord> toSend;
+ protected Map<String, String> taskConfig;
+ protected boolean started = false;
+
+ protected AbstractWorkerSourceTask(ConnectorTaskId id,
+ SourceTask task,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ TransformationChain<SourceRecord> transformationChain,
+ WorkerSourceTaskContext sourceTaskContext,
+ Producer<byte[], byte[]> producer,
+ TopicAdmin admin,
+ Map<String, TopicCreationGroup> topicGroups,
+ CloseableOffsetStorageReader offsetReader,
+ OffsetStorageWriter offsetWriter,
+ ConnectorOffsetBackingStore offsetStore,
+ WorkerConfig workerConfig,
+ ConnectMetrics connectMetrics,
+ ClassLoader loader,
+ Time time,
+ RetryWithToleranceOperator retryWithToleranceOperator,
+ StatusBackingStore statusBackingStore,
+ Executor closeExecutor) {
super(id, statusListener, initialState, loader, connectMetrics,
retryWithToleranceOperator, time, statusBackingStore);
this.workerConfig = workerConfig;
this.task = task;
- this.configState = configState;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.headerConverter = headerConverter;
@@ -139,15 +233,13 @@ class WorkerSourceTask extends WorkerTask {
this.admin = admin;
this.offsetReader = offsetReader;
this.offsetWriter = offsetWriter;
+ this.offsetStore = offsetStore;
this.closeExecutor = closeExecutor;
+ this.sourceTaskContext = sourceTaskContext;
- this.toSend = null;
- this.committableOffsets = CommittableOffsets.EMPTY;
- this.submittedRecords = new SubmittedRecords();
this.stopRequestedLatch = new CountDownLatch(1);
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
- this.producerSendException = new AtomicReference<>();
- this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+ this.topicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
}
@@ -162,40 +254,24 @@ class WorkerSourceTask extends WorkerTask {
}
@Override
- protected void close() {
- if (started) {
- try {
- task.stop();
- } catch (Throwable t) {
- log.warn("Could not stop task", t);
- }
- }
-
- closeProducer(Duration.ofSeconds(30));
-
- if (admin != null) {
- try {
- admin.close(Duration.ofSeconds(30));
- } catch (Throwable t) {
- log.warn("Failed to close admin client on time", t);
- }
- }
- Utils.closeQuietly(transformationChain, "transformation chain");
- Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
- }
-
- @Override
- public void removeMetrics() {
- try {
- sourceTaskMetricsGroup.close();
- } finally {
- super.removeMetrics();
- }
+ protected void initializeAndStart() {
+ prepareToInitializeTask();
+ // If we try to start the task at all by invoking initialize, then count this as
+ // "started" and expect a subsequent call to the task's stop() method
+ // to properly clean up any resources allocated by its initialize() or
+ // start() methods. If the task throws an exception during stop(),
+ // the worst thing that happens is another exception gets logged for an already-
+ // failed task
+ started = true;
+ task.initialize(sourceTaskContext);
+ task.start(taskConfig);
+ log.info("{} Source task finished initialization and start", this);
}
@Override
public void cancel() {
super.cancel();
+ // Preemptively close the offset reader in case the task is blocked on an offset read.
offsetReader.close();
// We proactively close the producer here as the main work thread for the task may
// be blocked indefinitely in a call to Producer::send if automatic topic creation is
@@ -214,36 +290,53 @@ class WorkerSourceTask extends WorkerTask {
}
@Override
- protected void initializeAndStart() {
- // If we try to start the task at all by invoking initialize, then count this as
- // "started" and expect a subsequent call to the task's stop() method
- // to properly clean up any resources allocated by its initialize() or
- // start() methods. If the task throws an exception during stop(),
- // the worst thing that happens is another exception gets logged for an already-
- // failed task
- started = true;
- task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
- task.start(taskConfig);
- log.info("{} Source task finished initialization and start", this);
+ public void removeMetrics() {
+ Utils.closeQuietly(sourceTaskMetricsGroup::close, "source task metrics tracker");
+ super.removeMetrics();
+ }
+
+ @Override
+ protected void close() {
+ if (started) {
+ Utils.closeQuietly(task::stop, "source task");
+ }
+
+ closeProducer(Duration.ofSeconds(30));
+
+ if (admin != null) {
+ Utils.closeQuietly(() -> admin.close(Duration.ofSeconds(30)), "source task admin");
+ }
+ Utils.closeQuietly(transformationChain, "transformation chain");
+ Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+ Utils.closeQuietly(offsetReader, "offset reader");
+ Utils.closeQuietly(offsetStore::stop, "offset backing store");
+ }
+
+ private void closeProducer(Duration duration) {
+ if (producer != null) {
+ Utils.closeQuietly(() -> producer.close(duration), "source task producer");
+ }
}
@Override
public void execute() {
try {
- log.info("{} Executing source task", this);
+ prepareToEnterSendLoop();
while (!isStopping()) {
- updateCommittableOffsets();
+ beginSendIteration();
if (shouldPause()) {
onPause();
if (awaitUnpause()) {
onResume();
+ prepareToEnterSendLoop();
}
continue;
}
- maybeThrowProducerSendException();
if (toSend == null) {
+ prepareToPollTask();
+
log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
long start = time.milliseconds();
toSend = poll();
@@ -251,91 +344,26 @@ class WorkerSourceTask extends WorkerTask {
recordPollReturned(toSend.size(), time.milliseconds() - start);
}
}
-
if (toSend == null)
continue;
log.trace("{} About to send {} records to Kafka", this, toSend.size());
- if (!sendRecords())
+ if (sendRecords()) {
+ batchDispatched();
+ } else {
stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
+ }
}
} catch (InterruptedException e) {
// Ignore and allow to exit.
- } finally {
- submittedRecords.awaitAllMessages(
- workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG),
- TimeUnit.MILLISECONDS
- );
- // It should still be safe to commit offsets since any exception would have
- // simply resulted in not getting more records but all the existing records should be ok to flush
- // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
- // to fail.
- updateCommittableOffsets();
- commitOffsets();
- }
- }
-
- private void closeProducer(Duration duration) {
- if (producer != null) {
+ } catch (RuntimeException e) {
try {
- producer.close(duration);
- } catch (Throwable t) {
- log.warn("Could not close producer for {}", id, t);
+ finalOffsetCommit(true);
+ } catch (Exception offsetException) {
+ log.error("Failed to commit offsets for already-failing task", offsetException);
}
+ throw e;
}
- }
-
- private void maybeThrowProducerSendException() {
- if (producerSendException.get() != null) {
- throw new ConnectException(
- "Unrecoverable exception from producer send callback",
- producerSendException.get()
- );
- }
- }
-
- private void updateCommittableOffsets() {
- CommittableOffsets newOffsets = submittedRecords.committableOffsets();
- synchronized (this) {
- this.committableOffsets = this.committableOffsets.updatedWith(newOffsets);
- }
- }
-
- protected List<SourceRecord> poll() throws InterruptedException {
- try {
- return task.poll();
- } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
- log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
- // Do nothing. Let the framework poll whenever it's ready.
- return null;
- }
- }
-
- /**
- * Convert the source record into a producer record.
- *
- * @param record the transformed record
- * @return the producer record which can sent over to Kafka. A null is returned if the input is null or
- * if an error was encountered during any of the converter stages.
- */
- private ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord record) {
- if (record == null) {
- return null;
- }
-
- RecordHeaders headers = retryWithToleranceOperator.execute(() -> convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverter.getClass());
-
- byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key()),
- Stage.KEY_CONVERTER, keyConverter.getClass());
-
- byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(record.topic(), headers, record.valueSchema(), record.value()),
- Stage.VALUE_CONVERTER, valueConverter.getClass());
-
- if (retryWithToleranceOperator.failed()) {
- return null;
- }
-
- return new ProducerRecord<>(record.topic(), record.kafkaPartition(),
- ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers);
+ finalOffsetCommit(false);
}
/**
@@ -343,62 +371,53 @@ class WorkerSourceTask extends WorkerTask {
* be retried after backing off. If a send fails and is not retriable, this will throw a ConnectException.
* @return true if all messages were sent, false if some need to be retried
*/
- private boolean sendRecords() {
+ // Visible for testing
+ boolean sendRecords() {
int processed = 0;
recordBatch(toSend.size());
final SourceRecordWriteCounter counter =
toSend.size() > 0 ? new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup) : null;
for (final SourceRecord preTransformRecord : toSend) {
- maybeThrowProducerSendException();
-
retryWithToleranceOperator.sourceRecord(preTransformRecord);
final SourceRecord record = transformationChain.apply(preTransformRecord);
final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
if (producerRecord == null || retryWithToleranceOperator.failed()) {
counter.skipRecord();
- commitTaskRecord(preTransformRecord, null);
+ recordDropped(preTransformRecord);
continue;
}
log.trace("{} Appending record to the topic {} with key {}, value {}", this, record.topic(), record.key(), record.value());
- SubmittedRecord submittedRecord = submittedRecords.submit(record);
+ Optional<SubmittedRecords.SubmittedRecord> submittedRecord = prepareToSendRecord(preTransformRecord, producerRecord);
try {
- maybeCreateTopic(record.topic());
final String topic = producerRecord.topic();
+ maybeCreateTopic(topic);
producer.send(
producerRecord,
(recordMetadata, e) -> {
if (e != null) {
- if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
- log.trace("Ignoring failed record send: {} failed to send record to {}: ",
- WorkerSourceTask.this, topic, e);
- // executeFailed here allows the use of existing logging infrastructure/configuration
- retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
- preTransformRecord, e);
- commitTaskRecord(preTransformRecord, null);
- } else {
- log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
- log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
- producerSendException.compareAndSet(null, e);
- }
+ log.debug("{} failed to send record to {}: ", AbstractWorkerSourceTask.this, topic, e);
+ log.trace("{} Failed record: {}", AbstractWorkerSourceTask.this, preTransformRecord);
+ producerSendFailed(false, producerRecord, preTransformRecord, e);
} else {
- submittedRecord.ack();
counter.completeRecord();
log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
- WorkerSourceTask.this,
+ AbstractWorkerSourceTask.this,
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
- commitTaskRecord(preTransformRecord, recordMetadata);
- if (isTopicTrackingEnabled) {
+ recordSent(preTransformRecord, producerRecord, recordMetadata);
+ submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::ack);
+ if (topicTrackingEnabled) {
recordActiveTopic(producerRecord.topic());
}
}
});
+ // Note that this will cause retries to take place within a transaction
} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
log.warn("{} Failed to send record to topic '{}' and partition '{}'. Backing off before retrying: ",
this, producerRecord.topic(), producerRecord.partition(), e);
toSend = toSend.subList(processed, toSend.size());
- submittedRecords.removeLastOccurrence(submittedRecord);
+ submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::drop);
counter.retryRemaining();
return false;
} catch (ConnectException e) {
@@ -407,21 +426,60 @@ class WorkerSourceTask extends WorkerTask {
log.trace("{} Failed to send {} with unrecoverable exception: ", this, producerRecord, e);
throw e;
} catch (KafkaException e) {
- throw new ConnectException("Unrecoverable exception trying to send", e);
+ producerSendFailed(true, producerRecord, preTransformRecord, e);
}
processed++;
+ recordDispatched(preTransformRecord);
}
toSend = null;
return true;
}
+ protected List<SourceRecord> poll() throws InterruptedException {
+ try {
+ return task.poll();
+ } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
+ log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
+ // Do nothing. Let the framework poll whenever it's ready.
+ return null;
+ }
+ }
+
+ /**
+ * Convert the source record into a producer record.
+ *
+ * @param record the transformed record
+ * @return the producer record which can sent over to Kafka. A null is returned if the input is null or
+ * if an error was encountered during any of the converter stages.
+ */
+ protected ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord record) {
+ if (record == null) {
+ return null;
+ }
+
+ RecordHeaders headers = retryWithToleranceOperator.execute(() -> convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverter.getClass());
+
+ byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key()),
+ Stage.KEY_CONVERTER, keyConverter.getClass());
+
+ byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(record.topic(), headers, record.valueSchema(), record.value()),
+ Stage.VALUE_CONVERTER, valueConverter.getClass());
+
+ if (retryWithToleranceOperator.failed()) {
+ return null;
+ }
+
+ return new ProducerRecord<>(record.topic(), record.kafkaPartition(),
+ ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers);
+ }
+
// Due to transformations that may change the destination topic of a record (such as
// RegexRouter) topic creation can not be batched for multiple topics
private void maybeCreateTopic(String topic) {
if (!topicCreation.isTopicCreationRequired(topic)) {
log.trace("Topic creation by the connector is disabled or the topic {} was previously created." +
- "If auto.create.topics.enable is enabled on the broker, " +
- "the topic will be created with default settings", topic);
+ "If auto.create.topics.enable is enabled on the broker, " +
+ "the topic will be created with default settings", topic);
return;
}
log.info("The task will send records to topic '{}' for the first time. Checking "
@@ -454,7 +512,7 @@ class WorkerSourceTask extends WorkerTask {
}
}
- private RecordHeaders convertHeaderFor(SourceRecord record) {
+ protected RecordHeaders convertHeaderFor(SourceRecord record) {
Headers headers = record.headers();
RecordHeaders result = new RecordHeaders();
if (headers != null) {
@@ -468,7 +526,7 @@ class WorkerSourceTask extends WorkerTask {
return result;
}
- private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
+ protected void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
try {
task.commitRecord(record, metadata);
} catch (Throwable t) {
@@ -476,113 +534,7 @@ class WorkerSourceTask extends WorkerTask {
}
}
- public boolean commitOffsets() {
- long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
-
- log.debug("{} Committing offsets", this);
-
- long started = time.milliseconds();
- long timeout = started + commitTimeoutMs;
-
- CommittableOffsets offsetsToCommit;
- synchronized (this) {
- offsetsToCommit = this.committableOffsets;
- this.committableOffsets = CommittableOffsets.EMPTY;
- }
-
- if (committableOffsets.isEmpty()) {
- log.debug("{} Either no records were produced by the task since the last offset commit, "
- + "or every record has been filtered out by a transformation "
- + "or dropped due to transformation or conversion errors.",
- this
- );
- // We continue with the offset commit process here instead of simply returning immediately
- // in order to invoke SourceTask::commit and record metrics for a successful offset commit
- } else {
- log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages());
- if (committableOffsets.hasPending()) {
- log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
- + "The source partition with the most pending messages is {}, with {} pending messages",
- this,
- committableOffsets.numUncommittableMessages(),
- committableOffsets.numDeques(),
- committableOffsets.largestDequePartition(),
- committableOffsets.largestDequeSize()
- );
- } else {
- log.debug("{} There are currently no pending messages for this offset commit; "
- + "all messages dispatched to the task's producer since the last commit have been acknowledged",
- this
- );
- }
- }
-
- // Update the offset writer with any new offsets for records that have been acked.
- // The offset writer will continue to track all offsets until they are able to be successfully flushed.
- // IOW, if the offset writer fails to flush, it keeps those offset for the next attempt,
- // though we may update them here with newer offsets for acked records.
- offsetsToCommit.offsets().forEach(offsetWriter::offset);
-
- if (!offsetWriter.beginFlush()) {
- // There was nothing in the offsets to process, but we still mark a successful offset commit.
- long durationMillis = time.milliseconds() - started;
- recordCommitSuccess(durationMillis);
- log.debug("{} Finished offset commitOffsets successfully in {} ms",
- this, durationMillis);
-
- commitSourceTask();
- return true;
- }
-
- // Now we can actually flush the offsets to user storage.
- Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
- if (error != null) {
- log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
- } else {
- log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
- }
- });
- // Very rare case: offsets were unserializable and we finished immediately, unable to store
- // any data
- if (flushFuture == null) {
- offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, null);
- return false;
- }
- try {
- flushFuture.get(Math.max(timeout - time.milliseconds(), 0), TimeUnit.MILLISECONDS);
- // There's a small race here where we can get the callback just as this times out (and log
- // success), but then catch the exception below and cancel everything. This won't cause any
- // errors, is only wasteful in this minor edge case, and the worst result is that the log
- // could look a little confusing.
- } catch (InterruptedException e) {
- log.warn("{} Flush of offsets interrupted, cancelling", this);
- offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, e);
- return false;
- } catch (ExecutionException e) {
- log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
- offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, e);
- return false;
- } catch (TimeoutException e) {
- log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this);
- offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, null);
- return false;
- }
-
- long durationMillis = time.milliseconds() - started;
- recordCommitSuccess(durationMillis);
- log.debug("{} Finished commitOffsets successfully in {} ms",
- this, durationMillis);
-
- commitSourceTask();
-
- return true;
- }
-
- private void commitSourceTask() {
+ protected void commitSourceTask() {
try {
this.task.commit();
} catch (Throwable t) {
@@ -590,13 +542,6 @@ class WorkerSourceTask extends WorkerTask {
}
}
- @Override
- public String toString() {
- return "WorkerSourceTask{" +
- "id=" + id +
- '}';
- }
-
protected void recordPollReturned(int numRecordsInBatch, long duration) {
sourceTaskMetricsGroup.recordPoll(numRecordsInBatch, duration);
}
@@ -639,7 +584,7 @@ class WorkerSourceTask extends WorkerTask {
}
static class SourceTaskMetricsGroup {
- private final MetricGroup metricGroup;
+ private final ConnectMetrics.MetricGroup metricGroup;
private final Sensor sourceRecordPoll;
private final Sensor sourceRecordWrite;
private final Sensor sourceRecordActiveCount;
@@ -690,7 +635,7 @@ class WorkerSourceTask extends WorkerTask {
sourceRecordActiveCount.record(activeRecordCount);
}
- protected MetricGroup metricGroup() {
+ protected ConnectMetrics.MetricGroup metricGroup() {
return metricGroup;
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
new file mode 100644
index 0000000000..faf0e8cbd8
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ClusterConfigState;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka, with support for exactly-once delivery guarantees.
+ */
+class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
+ private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
+
+ private boolean transactionOpen;
+ private final LinkedHashMap<SourceRecord, RecordMetadata> commitableRecords;
+
+ private final TransactionBoundaryManager transactionBoundaryManager;
+ private final TransactionMetricsGroup transactionMetrics;
+
+ private final ConnectorOffsetBackingStore offsetBackingStore;
+ private final Runnable preProducerCheck;
+ private final Runnable postProducerCheck;
+
+ public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
+ SourceTask task,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ TransformationChain<SourceRecord> transformationChain,
+ Producer<byte[], byte[]> producer,
+ TopicAdmin admin,
+ Map<String, TopicCreationGroup> topicGroups,
+ CloseableOffsetStorageReader offsetReader,
+ OffsetStorageWriter offsetWriter,
+ ConnectorOffsetBackingStore offsetBackingStore,
+ WorkerConfig workerConfig,
+ ClusterConfigState configState,
+ ConnectMetrics connectMetrics,
+ ClassLoader loader,
+ Time time,
+ RetryWithToleranceOperator retryWithToleranceOperator,
+ StatusBackingStore statusBackingStore,
+ SourceConnectorConfig sourceConfig,
+ Executor closeExecutor,
+ Runnable preProducerCheck,
+ Runnable postProducerCheck) {
+ super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain,
+ new WorkerSourceTaskContext(offsetReader, id, configState, buildTransactionContext(sourceConfig)),
+ producer, admin, topicGroups, offsetReader, offsetWriter, offsetBackingStore, workerConfig, connectMetrics,
+ loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
+
+ this.transactionOpen = false;
+ this.commitableRecords = new LinkedHashMap<>();
+ this.offsetBackingStore = offsetBackingStore;
+
+ this.preProducerCheck = preProducerCheck;
+ this.postProducerCheck = postProducerCheck;
+
+ this.transactionBoundaryManager = buildTransactionManager(workerConfig, sourceConfig, sourceTaskContext.transactionContext());
+ this.transactionMetrics = new TransactionMetricsGroup(id, connectMetrics);
+ }
+
+ private static WorkerTransactionContext buildTransactionContext(SourceConnectorConfig sourceConfig) {
+ return TransactionBoundary.CONNECTOR.equals(sourceConfig.transactionBoundary())
+ ? new WorkerTransactionContext()
+ : null;
+ }
+
+ @Override
+ protected void prepareToInitializeTask() {
+ preProducerCheck.run();
+
+ // Try not to start up the offset store (which has its own producer and consumer) if we've already been shut down at this point
+ if (isStopping())
+ return;
+ offsetBackingStore.start();
+
+ // Try not to initialize the transactional producer (which may accidentally fence out other, later task generations) if we've already
+ // been shut down at this point
+ if (isStopping())
+ return;
+ producer.initTransactions();
+
+ postProducerCheck.run();
+ }
+
+ @Override
+ protected void prepareToEnterSendLoop() {
+ transactionBoundaryManager.initialize();
+ }
+
+ @Override
+ protected void beginSendIteration() {
+ // No-op
+ }
+
+ @Override
+ protected void prepareToPollTask() {
+ // No-op
+ }
+
+ @Override
+ protected void recordDropped(SourceRecord record) {
+ synchronized (commitableRecords) {
+ commitableRecords.put(record, null);
+ }
+ transactionBoundaryManager.maybeCommitTransactionForRecord(record);
+ }
+
+ @Override
+ protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(
+ SourceRecord sourceRecord,
+ ProducerRecord<byte[], byte[]> producerRecord
+ ) {
+ if (offsetBackingStore.primaryOffsetsTopic().equals(producerRecord.topic())) {
+ // This is to prevent deadlock that occurs when:
+ // 1. A task provides a record whose topic is the task's offsets topic
+ // 2. That record is dispatched to the task's producer in a transaction that remains open
+ // at least until the worker polls the task again
+ // 3. In the subsequent call to SourceTask::poll, the task requests offsets from the worker
+ // (which requires a read to the end of the offsets topic, and will block until any open
+ // transactions on the topic are either committed or aborted)
+ throw new ConnectException("Source tasks may not produce to their own offsets topics when exactly-once support is enabled");
+ }
+ maybeBeginTransaction();
+ return Optional.empty();
+ }
+
+ @Override
+ protected void recordDispatched(SourceRecord record) {
+ // Offsets are converted & serialized in the OffsetWriter
+ // Important: we only save offsets for the record after it has been accepted by the producer; this way,
+ // we commit those offsets if and only if the record is sent successfully.
+ offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
+ transactionMetrics.addRecord();
+ transactionBoundaryManager.maybeCommitTransactionForRecord(record);
+ }
+
+ @Override
+ protected void batchDispatched() {
+ transactionBoundaryManager.maybeCommitTransactionForBatch();
+ }
+
+ @Override
+ protected void recordSent(
+ SourceRecord sourceRecord,
+ ProducerRecord<byte[], byte[]> producerRecord,
+ RecordMetadata recordMetadata
+ ) {
+ synchronized (commitableRecords) {
+ commitableRecords.put(sourceRecord, recordMetadata);
+ }
+ }
+
+ @Override
+ protected void producerSendFailed(
+ boolean synchronous,
+ ProducerRecord<byte[], byte[]> producerRecord,
+ SourceRecord preTransformRecord,
+ Exception e
+ ) {
+ if (synchronous) {
+ throw maybeWrapProducerSendException(
+ "Unrecoverable exception trying to send",
+ e
+ );
+ } else {
+ // No-op; all asynchronously-reported producer exceptions should be bubbled up again by Producer::commitTransaction
+ }
+ }
+
+ @Override
+ protected void finalOffsetCommit(boolean failed) {
+ if (failed) {
+ log.debug("Skipping final offset commit as task has failed");
+ return;
+ }
+
+ // It should be safe to commit here even if we were in the middle of retrying on RetriableExceptions in the
+ // send loop since we only track source offsets for records that have been successfully dispatched to the
+ // producer.
+ // Any records that we were retrying on (and any records after them in the batch) won't be included in the
+ // transaction and their offsets won't be committed, but (unless the user has requested connector-defined
+ // transaction boundaries), it's better to commit some data than none.
+ transactionBoundaryManager.maybeCommitFinalTransaction();
+ }
+
+ @Override
+ protected void onPause() {
+ super.onPause();
+ // Commit the transaction now so that we don't end up with a hanging transaction, or worse, get fenced out
+ // and fail the task once unpaused
+ transactionBoundaryManager.maybeCommitFinalTransaction();
+ }
+
+ private void maybeBeginTransaction() {
+ if (!transactionOpen) {
+ producer.beginTransaction();
+ transactionOpen = true;
+ }
+ }
+
+ private void commitTransaction() {
+ log.debug("{} Committing offsets", this);
+
+ long started = time.milliseconds();
+
+ // We might have just aborted a transaction, in which case we'll have to begin a new one
+ // in order to commit offsets
+ maybeBeginTransaction();
+
+ AtomicReference<Throwable> flushError = new AtomicReference<>();
+ if (offsetWriter.beginFlush()) {
+ // Now we can actually write the offsets to the internal topic.
+ // No need to track the flush future here since it's guaranteed to complete by the time
+ // Producer::commitTransaction completes
+ // We do have to track failures for that callback though, since they may originate from outside
+ // the producer (i.e., the offset writer or the backing offset store), and would not cause
+ // Producer::commitTransaction to fail
+ offsetWriter.doFlush((error, result) -> {
+ if (error != null) {
+ log.error("{} Failed to flush offsets to storage: ", ExactlyOnceWorkerSourceTask.this, error);
+ flushError.compareAndSet(null, error);
+ } else {
+ log.trace("{} Finished flushing offsets to storage", ExactlyOnceWorkerSourceTask.this);
+ }
+ });
+ }
+
+ // Commit the transaction
+ // Blocks until all outstanding records have been sent and ack'd
+ try {
+ producer.commitTransaction();
+ } catch (Throwable t) {
+ log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t);
+ flushError.compareAndSet(null, t);
+ }
+
+ transactionOpen = false;
+
+ Throwable error = flushError.get();
+ if (error != null) {
+ recordCommitFailure(time.milliseconds() - started, null);
+ offsetWriter.cancelFlush();
+ throw maybeWrapProducerSendException(
+ "Failed to flush offsets and/or records for task " + id,
+ error
+ );
+ }
+
+ transactionMetrics.commitTransaction();
+
+ long durationMillis = time.milliseconds() - started;
+ recordCommitSuccess(durationMillis);
+ log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis);
+
+ // Synchronize in order to guarantee that writes on other threads are picked up by this one
+ synchronized (commitableRecords) {
+ commitableRecords.forEach(this::commitTaskRecord);
+ commitableRecords.clear();
+ }
+ commitSourceTask();
+ }
+
+ private RuntimeException maybeWrapProducerSendException(String message, Throwable error) {
+ if (isPossibleTransactionTimeoutError(error)) {
+ return wrapTransactionTimeoutError(error);
+ } else {
+ return new ConnectException(message, error);
+ }
+ }
+
+ private static boolean isPossibleTransactionTimeoutError(Throwable error) {
+ return error instanceof InvalidProducerEpochException
+ || error.getCause() instanceof InvalidProducerEpochException;
+ }
+
+ private ConnectException wrapTransactionTimeoutError(Throwable error) {
+ return new ConnectException(
+ "The task " + id + " was unable to finish writing records to Kafka before its producer transaction expired. "
+ + "It may be necessary to reconfigure this connector in order for it to run healthily with exactly-once support. "
+ + "Options for this include: tune the connector's producer configuration for higher throughput, "
+ + "increase the transaction timeout for the connector's producers, "
+ + "decrease the offset commit interval (if using interval-based transaction boundaries), "
+ + "or use the 'poll' transaction boundary (if the connector is not already configured to use it).",
+ error
+ );
+ }
+
+ @Override
+ public String toString() {
+ return "ExactlyOnceWorkerSourceTask{" +
+ "id=" + id +
+ '}';
+ }
+
+ private abstract class TransactionBoundaryManager {
+ protected boolean shouldCommitTransactionForRecord(SourceRecord record) {
+ return false;
+ }
+
+ protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
+ return false;
+ }
+
+ protected boolean shouldCommitFinalTransaction() {
+ return false;
+ }
+
+ /**
+ * Hook to signal that a new transaction cycle has been started. May be invoked
+ * multiple times if the task is paused and then resumed. It can be assumed that
+ * a new transaction is created at least every time an existing transaction is
+ * committed; this is just a hook to notify that a new transaction may have been
+ * created outside of that flow as well.
+ */
+ protected void initialize() {
+ }
+
+ public void maybeCommitTransactionForRecord(SourceRecord record) {
+ maybeCommitTransaction(shouldCommitTransactionForRecord(record));
+ }
+
+ public void maybeCommitTransactionForBatch() {
+ maybeCommitTransaction(shouldCommitTransactionForBatch(time.milliseconds()));
+ }
+
+ public void maybeCommitFinalTransaction() {
+ maybeCommitTransaction(shouldCommitFinalTransaction());
+ }
+
+ private void maybeCommitTransaction(boolean shouldCommit) {
+ if (shouldCommit && (transactionOpen || offsetWriter.willFlush())) {
+ try (LoggingContext loggingContext = LoggingContext.forOffsets(id)) {
+ commitTransaction();
+ }
+ }
+ }
+ }
+
+ private TransactionBoundaryManager buildTransactionManager(
+ WorkerConfig workerConfig,
+ SourceConnectorConfig sourceConfig,
+ WorkerTransactionContext transactionContext) {
+ TransactionBoundary boundary = sourceConfig.transactionBoundary();
+ switch (boundary) {
+ case POLL:
+ return new TransactionBoundaryManager() {
+ @Override
+ protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
+ return true;
+ }
+
+ @Override
+ protected boolean shouldCommitFinalTransaction() {
+ return true;
+ }
+ };
+
+ case INTERVAL:
+ long transactionBoundaryInterval = Optional.ofNullable(sourceConfig.transactionBoundaryInterval())
+ .orElse(workerConfig.offsetCommitInterval());
+ return new TransactionBoundaryManager() {
+ private final long commitInterval = transactionBoundaryInterval;
+ private long lastCommit;
+
+ @Override
+ public void initialize() {
+ this.lastCommit = time.milliseconds();
+ }
+
+ @Override
+ protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
+ if (time.milliseconds() >= lastCommit + commitInterval) {
+ lastCommit = time.milliseconds();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ protected boolean shouldCommitFinalTransaction() {
+ return true;
+ }
+ };
+
+ case CONNECTOR:
+ Objects.requireNonNull(transactionContext, "Transaction context must be provided when using connector-defined transaction boundaries");
+ return new TransactionBoundaryManager() {
+ @Override
+ protected boolean shouldCommitFinalTransaction() {
+ return shouldCommitTransactionForBatch(time.milliseconds());
+ }
+
+ @Override
+ protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
+ if (transactionContext.shouldAbortBatch()) {
+ log.info("Aborting transaction for batch as requested by connector");
+ abortTransaction();
+ // We abort the transaction, which causes all the records up to this point to be dropped, but we still want to
+ // commit offsets so that the task doesn't see the same records all over again
+ return true;
+ }
+ return transactionContext.shouldCommitBatch();
+ }
+
+ @Override
+ protected boolean shouldCommitTransactionForRecord(SourceRecord record) {
+ if (transactionContext.shouldAbortOn(record)) {
+ log.info("Aborting transaction for record on topic {} as requested by connector", record.topic());
+ log.trace("Last record in aborted transaction: {}", record);
+ abortTransaction();
+ // We abort the transaction, which causes all the records up to this point to be dropped, but we still want to
+ // commit offsets so that the task doesn't see the same records all over again
+ return true;
+ }
+ return transactionContext.shouldCommitOn(record);
+ }
+
+ private void abortTransaction() {
+ producer.abortTransaction();
+ transactionMetrics.abortTransaction();
+ transactionOpen = false;
+ }
+ };
+ default:
+ throw new IllegalArgumentException("Unrecognized transaction boundary: " + boundary);
+ }
+ }
+
+ TransactionMetricsGroup transactionMetricsGroup() {
+ return transactionMetrics;
+ }
+
+
+ static class TransactionMetricsGroup {
+ private final Sensor transactionSize;
+ private int size;
+ private final ConnectMetrics.MetricGroup metricGroup;
+
+ public TransactionMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
+ ConnectMetricsRegistry registry = connectMetrics.registry();
+ metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
+ registry.connectorTagName(), id.connector(),
+ registry.taskTagName(), Integer.toString(id.task()));
+
+ transactionSize = metricGroup.sensor("transaction-size");
+ transactionSize.add(metricGroup.metricName(registry.transactionSizeAvg), new Avg());
+ transactionSize.add(metricGroup.metricName(registry.transactionSizeMin), new Min());
+ transactionSize.add(metricGroup.metricName(registry.transactionSizeMax), new Max());
+ }
+
+ void addRecord() {
+ size++;
+ }
+
+ void abortTransaction() {
+ size = 0;
+ }
+
+ void commitTransaction() {
+ transactionSize.record(size);
+ size = 0;
+ }
+
+ protected ConnectMetrics.MetricGroup metricGroup() {
+ return metricGroup;
+ }
+
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
index 6cdd2c1842..b77a6fa984 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
* returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
* {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
- * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link SubmittedRecord#drop dropped}.
* Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
* {@link SubmittedRecord#ack() acknowledged} from a different thread.
*/
@@ -54,13 +54,13 @@ class SubmittedRecords {
/**
* Enqueue a new source record before dispatching it to a producer.
* The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
- * producer callback, or {@link #removeLastOccurrence(SubmittedRecord) removed} if the record could not be successfully
+ * producer callback, or {@link SubmittedRecord#drop() dropped} if the record could not be successfully
* sent to the producer.
- *
+ *
* @param record the record about to be dispatched; may not be null but may have a null
* {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
* @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
- * the producer, or {@link #removeLastOccurrence removed} if synchronously rejected by the producer
+ * the producer, or {@link SubmittedRecord#drop() dropped} if synchronously rejected by the producer
*/
@SuppressWarnings("unchecked")
public SubmittedRecord submit(SourceRecord record) {
@@ -78,32 +78,6 @@ class SubmittedRecords {
return result;
}
- /**
- * Remove a source record and do not take it into account any longer when tracking offsets.
- * Useful if the record has been synchronously rejected by the producer.
- * If multiple instances of the same {@link SubmittedRecord} have been submitted already, only the first one found
- * (traversing from the end of the deque backward) will be removed.
- * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
- * @return whether an instance of the record was removed
- */
- public boolean removeLastOccurrence(SubmittedRecord record) {
- Deque<SubmittedRecord> deque = records.get(record.partition());
- if (deque == null) {
- log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition());
- return false;
- }
- boolean result = deque.removeLastOccurrence(record);
- if (deque.isEmpty()) {
- records.remove(record.partition());
- }
- if (result) {
- messageAcked();
- } else {
- log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", record.partition());
- }
- return result;
- }
-
/**
* Clear out any acknowledged records at the head of the deques and return a {@link CommittableOffsets snapshot} of the offsets and offset metadata
* accrued between the last time this method was invoked and now. This snapshot can be {@link CommittableOffsets#updatedWith(CommittableOffsets) combined}
@@ -187,7 +161,7 @@ class SubmittedRecords {
}
}
- class SubmittedRecord {
+ public class SubmittedRecord {
private final Map<String, Object> partition;
private final Map<String, Object> offset;
private final AtomicBoolean acked;
@@ -208,6 +182,34 @@ class SubmittedRecords {
}
}
+ /**
+ * Remove this record and do not take it into account any longer when tracking offsets.
+ * Useful if the record has been synchronously rejected by the producer.
+ * If multiple instances of this record have been submitted already, only the first one found
+ * (traversing from the end of the deque backward) will be removed.
+ * <p>
+ * This is <strong>not safe</strong> to be called from a different thread
+ * than what called {@link SubmittedRecords#submit(SourceRecord)}.
+ * @return whether this instance was dropped
+ */
+ public boolean drop() {
+ Deque<SubmittedRecord> deque = records.get(partition);
+ if (deque == null) {
+ log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", partition);
+ return false;
+ }
+ boolean result = deque.removeLastOccurrence(this);
+ if (deque.isEmpty()) {
+ records.remove(partition);
+ }
+ if (result) {
+ messageAcked();
+ } else {
+ log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", partition);
+ }
+ return result;
+ }
+
private boolean acked() {
return acked.get();
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 34258f82fb..d06a06a73a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.provider.ConfigProvider;
@@ -41,6 +42,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.storage.ClusterConfigState;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
@@ -54,10 +56,10 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.Callback;
@@ -75,7 +77,10 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -115,11 +120,11 @@ public class Worker {
private final WorkerConfig config;
private final Converter internalKeyConverter;
private final Converter internalValueConverter;
- private final OffsetBackingStore offsetBackingStore;
+ private final OffsetBackingStore globalOffsetBackingStore;
private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
- private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
+ private Optional<SourceTaskOffsetCommitter> sourceTaskOffsetCommitter;
private final WorkerConfigTransformer workerConfigTransformer;
private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
@@ -128,9 +133,9 @@ public class Worker {
Time time,
Plugins plugins,
WorkerConfig config,
- OffsetBackingStore offsetBackingStore,
+ OffsetBackingStore globalOffsetBackingStore,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
- this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy);
+ this(workerId, time, plugins, config, globalOffsetBackingStore, Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy);
}
Worker(
@@ -138,7 +143,7 @@ public class Worker {
Time time,
Plugins plugins,
WorkerConfig config,
- OffsetBackingStore offsetBackingStore,
+ OffsetBackingStore globalOffsetBackingStore,
ExecutorService executorService,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy
) {
@@ -156,8 +161,8 @@ public class Worker {
this.internalKeyConverter = plugins.newInternalConverter(true, JsonConverter.class.getName(), internalConverterConfig);
this.internalValueConverter = plugins.newInternalConverter(false, JsonConverter.class.getName(), internalConverterConfig);
- this.offsetBackingStore = offsetBackingStore;
- this.offsetBackingStore.configure(config);
+ this.globalOffsetBackingStore = globalOffsetBackingStore;
+ this.globalOffsetBackingStore.configure(config);
this.workerConfigTransformer = initConfigTransformer();
@@ -191,8 +196,11 @@ public class Worker {
public void start() {
log.info("Worker starting");
- offsetBackingStore.start();
- sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
+ globalOffsetBackingStore.start();
+
+ sourceTaskOffsetCommitter = config.exactlyOnceSourceEnabled()
+ ? Optional.empty()
+ : Optional.of(new SourceTaskOffsetCommitter(config));
connectorStatusMetricsGroup = new ConnectorStatusMetricsGroup(metrics, tasks, herder);
@@ -219,9 +227,9 @@ public class Worker {
}
long timeoutMs = limit - time.milliseconds();
- sourceTaskOffsetCommitter.close(timeoutMs);
+ sourceTaskOffsetCommitter.ifPresent(committer -> committer.close(timeoutMs));
- offsetBackingStore.stop();
+ globalOffsetBackingStore.stop();
metrics.stop();
log.info("Worker stopped");
@@ -285,14 +293,25 @@ public class Worker {
log.info("Creating connector {} of type {}", connName, connClass);
final Connector connector = plugins.newConnector(connClass);
- final ConnectorConfig connConfig = ConnectUtils.isSinkConnector(connector)
- ? new SinkConnectorConfig(plugins, connProps)
- : new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
-
- final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(
- offsetBackingStore, connName, internalKeyConverter, internalValueConverter);
+ final ConnectorConfig connConfig;
+ final CloseableOffsetStorageReader offsetReader;
+ final ConnectorOffsetBackingStore offsetStore;
+ if (ConnectUtils.isSinkConnector(connector)) {
+ connConfig = new SinkConnectorConfig(plugins, connProps);
+ offsetReader = null;
+ offsetStore = null;
+ } else {
+ SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+ connConfig = sourceConfig;
+
+ // Set up the offset backing store for this connector instance
+ // (This logic is implemented in a follow-up pull request: https://github.com/apache/kafka/pull/11781)
+ offsetStore = new ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");
+ offsetStore.configure(config);
+ offsetReader = new OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, internalValueConverter);
+ }
workerConnector = new WorkerConnector(
- connName, connector, connConfig, ctx, metrics, connectorStatusListener, offsetReader, connectorLoader);
+ connName, connector, connConfig, ctx, metrics, connectorStatusListener, offsetReader, offsetStore, connectorLoader);
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
workerConnector.transitionTo(initialState, onConnectorStateChange);
Plugins.compareAndSwapLoaders(savedLoader);
@@ -497,22 +516,95 @@ public class Worker {
}
/**
- * Start a task managed by this worker.
+ * Start a sink task managed by this worker.
+ *
+ * @param id the task ID.
+ * @param configState the most recent {@link ClusterConfigState} known to the worker
+ * @param connProps the connector properties.
+ * @param taskProps the tasks properties.
+ * @param statusListener a listener for the runtime status transitions of the task.
+ * @param initialState the initial state of the connector.
+ * @return true if the task started successfully.
+ */
+ public boolean startSinkTask(
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ Map<String, String> connProps,
+ Map<String, String> taskProps,
+ TaskStatus.Listener statusListener,
+ TargetState initialState
+ ) {
+ return startTask(id, connProps, taskProps, statusListener,
+ new SinkTaskBuilder(id, configState, statusListener, initialState));
+ }
+
+ /**
+ * Start a source task managed by this worker using older behavior that does not provide exactly-once support.
*
* @param id the task ID.
+ * @param configState the most recent {@link ClusterConfigState} known to the worker
* @param connProps the connector properties.
* @param taskProps the tasks properties.
* @param statusListener a listener for the runtime status transitions of the task.
* @param initialState the initial state of the connector.
* @return true if the task started successfully.
*/
- public boolean startTask(
+ public boolean startSourceTask(
ConnectorTaskId id,
ClusterConfigState configState,
Map<String, String> connProps,
Map<String, String> taskProps,
TaskStatus.Listener statusListener,
TargetState initialState
+ ) {
+ return startTask(id, connProps, taskProps, statusListener,
+ new SourceTaskBuilder(id, configState, statusListener, initialState));
+ }
+
+ /**
+ * Start a source task with exactly-once support managed by this worker.
+ *
+ * @param id the task ID.
+ * @param configState the most recent {@link ClusterConfigState} known to the worker
+ * @param connProps the connector properties.
+ * @param taskProps the tasks properties.
+ * @param statusListener a listener for the runtime status transitions of the task.
+ * @param initialState the initial state of the connector.
+ * @param preProducerCheck a preflight check that should be performed before the task initializes its transactional producer.
+ * @param postProducerCheck a preflight check that should be performed after the task initializes its transactional producer,
+ * but before producing any source records or offsets.
+ * @return true if the task started successfully.
+ */
+ public boolean startExactlyOnceSourceTask(
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ Map<String, String> connProps,
+ Map<String, String> taskProps,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ Runnable preProducerCheck,
+ Runnable postProducerCheck
+ ) {
+ return startTask(id, connProps, taskProps, statusListener,
+ new ExactlyOnceSourceTaskBuilder(id, configState, statusListener, initialState, preProducerCheck, postProducerCheck));
+ }
+
+ /**
+ * Start a task managed by this worker.
+ *
+ * @param id the task ID.
+ * @param connProps the connector properties.
+ * @param taskProps the tasks properties.
+ * @param statusListener a listener for the runtime status transitions of the task.
+ * @param taskBuilder the {@link TaskBuilder} used to create the {@link WorkerTask} that manages the lifecycle of the task.
+ * @return true if the task started successfully.
+ */
+ private boolean startTask(
+ ConnectorTaskId id,
+ Map<String, String> connProps,
+ Map<String, String> taskProps,
+ TaskStatus.Listener statusListener,
+ TaskBuilder taskBuilder
) {
final WorkerTask workerTask;
final TaskStatus.Listener taskStatusListener = workerMetricsGroup.wrapStatusListener(statusListener);
@@ -563,8 +655,15 @@ public class Worker {
log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id);
}
- workerTask = buildWorkerTask(configState, connConfig, id, task, taskStatusListener,
- initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
+ workerTask = taskBuilder
+ .withTask(task)
+ .withConnectorConfig(connConfig)
+ .withKeyConverter(keyConverter)
+ .withValueConverter(valueConverter)
+ .withHeaderConverter(headerConverter)
+ .withClassloader(connectorLoader)
+ .build();
+
workerTask.initialize(taskConfig);
Plugins.compareAndSwapLoaders(savedLoader);
} catch (Throwable t) {
@@ -583,7 +682,7 @@ public class Worker {
executor.submit(workerTask);
if (workerTask instanceof WorkerSourceTask) {
- sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
+ sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, (WorkerSourceTask) workerTask));
}
return true;
}
@@ -643,74 +742,28 @@ public class Worker {
}
}
- private WorkerTask buildWorkerTask(ClusterConfigState configState,
- ConnectorConfig connConfig,
- ConnectorTaskId id,
- Task task,
- TaskStatus.Listener statusListener,
- TargetState initialState,
- Converter keyConverter,
- Converter valueConverter,
- HeaderConverter headerConverter,
- ClassLoader loader) {
- ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
- final Class<? extends Connector> connectorClass = plugins.connectorClass(
- connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
- RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
- connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
- // Decide which type of worker task we need based on the type of task.
- if (task instanceof SourceTask) {
- SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
- connConfig.originalsStrings(), config.topicCreationEnable());
- retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
- TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
- log.info("Initializing: {}", transformationChain);
- CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
- internalKeyConverter, internalValueConverter);
- OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
- internalKeyConverter, internalValueConverter);
- Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass,
- connectorClientConfigOverridePolicy, kafkaClusterId);
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
- TopicAdmin admin;
- Map<String, TopicCreationGroup> topicCreationGroups;
- if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
- Map<String, Object> adminProps = adminConfigs(id.connector(), "connector-adminclient-" + id, config,
- sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
- admin = new TopicAdmin(adminProps);
- topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
- } else {
- admin = null;
- topicCreationGroups = null;
- }
-
- // Note we pass the configState as it performs dynamic transformations under the covers
- return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
- headerConverter, transformationChain, producer, admin, topicCreationGroups,
- offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore(), executor);
- } else if (task instanceof SinkTask) {
- TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
- log.info("Initializing: {}", transformationChain);
- SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
- retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
- WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
- keyConverter, valueConverter, headerConverter);
-
- Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
- KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
-
- return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
- valueConverter, headerConverter, transformationChain, consumer, loader, time,
- retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
- } else {
- log.error("Tasks must be a subclass of either SourceTask or SinkTask and current is {}", task);
- throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
- }
+ static Map<String, Object> exactlyOnceSourceTaskProducerConfigs(ConnectorTaskId id,
+ WorkerConfig config,
+ ConnectorConfig connConfig,
+ Class<? extends Connector> connectorClass,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ String clusterId) {
+ Map<String, Object> result = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, clusterId);
+ ConnectUtils.ensureProperty(
+ result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
+ "for connectors when exactly-once source support is enabled",
+ false
+ );
+ String transactionalId = taskTransactionalId(config.groupId(), id.connector(), id.task());
+ ConnectUtils.ensureProperty(
+ result, ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId,
+ "for connectors when exactly-once source support is enabled",
+ true
+ );
+ return result;
}
- static Map<String, Object> producerConfigs(String connName,
+ static Map<String, Object> baseProducerConfigs(String connName,
String defaultClientId,
WorkerConfig config,
ConnectorConfig connConfig,
@@ -749,20 +802,56 @@ public class Worker {
return producerProps;
}
- static Map<String, Object> consumerConfigs(ConnectorTaskId id,
+ static Map<String, Object> exactlyOnceSourceOffsetsConsumerConfigs(String connName,
+ String defaultClientId,
+ WorkerConfig config,
+ ConnectorConfig connConfig,
+ Class<? extends Connector> connectorClass,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ String clusterId) {
+ Map<String, Object> result = baseConsumerConfigs(
+ connName, defaultClientId, config, connConfig, connectorClass,
+ connectorClientConfigOverridePolicy, clusterId, ConnectorType.SOURCE);
+ ConnectUtils.ensureProperty(
+ result, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
+ "for source connectors' offset consumers when exactly-once source support is enabled",
+ false
+ );
+ return result;
+ }
+
+ static Map<String, Object> regularSourceOffsetsConsumerConfigs(String connName,
+ String defaultClientId,
+ WorkerConfig config,
+ ConnectorConfig connConfig,
+ Class<? extends Connector> connectorClass,
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ String clusterId) {
+ Map<String, Object> result = baseConsumerConfigs(
+ connName, defaultClientId, config, connConfig, connectorClass,
+ connectorClientConfigOverridePolicy, clusterId, ConnectorType.SOURCE);
+ // Users can disable this if they want to; it won't affect delivery guarantees since the task isn't exactly-once anyways
+ result.putIfAbsent(
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+ IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+ return result;
+ }
+
+ static Map<String, Object> baseConsumerConfigs(String connName,
+ String defaultClientId,
WorkerConfig config,
ConnectorConfig connConfig,
Class<? extends Connector> connectorClass,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
- String clusterId) {
+ String clusterId,
+ ConnectorType connectorType) {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
Map<String, Object> consumerProps = new HashMap<>();
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector()));
- consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "connector-consumer-" + id);
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(connName));
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, defaultClientId);
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers());
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
@@ -773,8 +862,8 @@ public class Worker {
ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
// Connector-specified overrides
Map<String, Object> consumerOverrides =
- connectorClientConfigOverrides(id.connector(), connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
- ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.CONSUMER,
+ connectorClientConfigOverrides(connName, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+ connectorType, ConnectorClientConfigRequest.ClientType.CONSUMER,
connectorClientConfigOverridePolicy);
consumerProps.putAll(consumerOverrides);
@@ -849,6 +938,11 @@ public class Worker {
return taskTransactionalId(config.groupId(), id.connector(), id.task());
}
+ /**
+ * @return the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG transactional ID} to use for a task that writes
+ * records and/or offsets in a transaction. Not to be confused with {@link DistributedConfig#transactionalProducerId()},
+ * which is not used by tasks at all, but instead, by the worker itself.
+ */
public static String taskTransactionalId(String groupId, String connector, int taskId) {
return String.format("%s-%s-%d", groupId, connector, taskId);
}
@@ -867,8 +961,8 @@ public class Worker {
// check if topic for dead letter queue exists
String topic = connConfig.dlqTopicName();
if (topic != null && !topic.isEmpty()) {
- Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-dlq-producer-" + id, config, connConfig, connectorClass,
- connectorClientConfigOverridePolicy, kafkaClusterId);
+ Map<String, Object> producerProps = baseProducerConfigs(id.connector(), "connector-dlq-producer-" + id, config, connConfig, connectorClass,
+ connectorClientConfigOverridePolicy, kafkaClusterId);
Map<String, Object> adminProps = adminConfigs(id.connector(), "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK);
DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
@@ -911,7 +1005,7 @@ public class Worker {
log.info("Stopping task {}", task.id());
if (task instanceof WorkerSourceTask)
- sourceTaskOffsetCommitter.remove(task.id());
+ sourceTaskOffsetCommitter.ifPresent(committer -> committer.remove(task.id()));
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
@@ -1069,6 +1163,268 @@ public class Worker {
return workerMetricsGroup;
}
+ abstract class TaskBuilder {
+
+ private final ConnectorTaskId id;
+ private final ClusterConfigState configState;
+ private final TaskStatus.Listener statusListener;
+ private final TargetState initialState;
+
+ private Task task = null;
+ private ConnectorConfig connectorConfig = null;
+ private Converter keyConverter = null;
+ private Converter valueConverter = null;
+ private HeaderConverter headerConverter = null;
+ private ClassLoader classLoader = null;
+
+ public TaskBuilder(ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState) {
+ this.id = id;
+ this.configState = configState;
+ this.statusListener = statusListener;
+ this.initialState = initialState;
+ }
+
+ public TaskBuilder withTask(Task task) {
+ this.task = task;
+ return this;
+ }
+
+ public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) {
+ this.connectorConfig = connectorConfig;
+ return this;
+ }
+
+ public TaskBuilder withKeyConverter(Converter keyConverter) {
+ this.keyConverter = keyConverter;
+ return this;
+ }
+
+ public TaskBuilder withValueConverter(Converter valueConverter) {
+ this.valueConverter = valueConverter;
+ return this;
+ }
+
+ public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) {
+ this.headerConverter = headerConverter;
+ return this;
+ }
+
+ public TaskBuilder withClassloader(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ return this;
+ }
+
+ public WorkerTask build() {
+ Objects.requireNonNull(task, "Task cannot be null");
+ Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null");
+ Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null");
+ Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null");
+ Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null");
+ Objects.requireNonNull(classLoader, "Classloader used by task cannot be null");
+
+ ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
+ final Class<? extends Connector> connectorClass = plugins.connectorClass(
+ connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+ RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+ connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM);
+ retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+ return doBuild(task, id, configState, statusListener, initialState,
+ connectorConfig, keyConverter, valueConverter, headerConverter, classLoader,
+ errorHandlingMetrics, connectorClass, retryWithToleranceOperator);
+ }
+
+ abstract WorkerTask doBuild(Task task,
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ ConnectorConfig connectorConfig,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ ClassLoader classLoader,
+ ErrorHandlingMetrics errorHandlingMetrics,
+ Class<? extends Connector> connectorClass,
+ RetryWithToleranceOperator retryWithToleranceOperator);
+
+ }
+
+ class SinkTaskBuilder extends TaskBuilder {
+ public SinkTaskBuilder(ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState) {
+ super(id, configState, statusListener, initialState);
+ }
+
+ @Override
+ public WorkerTask doBuild(Task task,
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ ConnectorConfig connectorConfig,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ ClassLoader classLoader,
+ ErrorHandlingMetrics errorHandlingMetrics,
+ Class<? extends Connector> connectorClass,
+ RetryWithToleranceOperator retryWithToleranceOperator) {
+
+ TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
+ log.info("Initializing: {}", transformationChain);
+ SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings());
+ retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+ WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+ keyConverter, valueConverter, headerConverter);
+
+ Map<String, Object> consumerProps = baseConsumerConfigs(
+ id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass,
+ connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK);
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
+
+ return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
+ valueConverter, headerConverter, transformationChain, consumer, classLoader, time,
+ retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
+ }
+ }
+
+ class SourceTaskBuilder extends TaskBuilder {
+ public SourceTaskBuilder(ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState) {
+ super(id, configState, statusListener, initialState);
+ }
+
+ @Override
+ public WorkerTask doBuild(Task task,
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ ConnectorConfig connectorConfig,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ ClassLoader classLoader,
+ ErrorHandlingMetrics errorHandlingMetrics,
+ Class<? extends Connector> connectorClass,
+ RetryWithToleranceOperator retryWithToleranceOperator) {
+
+ SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
+ connectorConfig.originalsStrings(), config.topicCreationEnable());
+ retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
+ TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
+ log.info("Initializing: {}", transformationChain);
+
+ Map<String, Object> producerProps = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass,
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+ TopicAdmin topicAdmin;
+ Map<String, TopicCreationGroup> topicCreationGroups;
+ if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
+ topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
+ // Create a topic admin that the task can use for topic creation
+ Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+ sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+ topicAdmin = new TopicAdmin(adminOverrides);
+ } else {
+ topicAdmin = null;
+ topicCreationGroups = null;
+ }
+
+ // Set up the offset backing store for this task instance
+ // (This logic is implemented in a follow-up pull request: https://github.com/apache/kafka/pull/11781)
+ ConnectorOffsetBackingStore offsetStore = new ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");
+ offsetStore.configure(config);
+
+ CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter);
+ OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter);
+
+ // Note we pass the configState as it performs dynamic transformations under the covers
+ return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
+ headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups,
+ offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time,
+ retryWithToleranceOperator, herder.statusBackingStore(), executor);
+ }
+ }
+
+ class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
+ private final Runnable preProducerCheck;
+ private final Runnable postProducerCheck;
+
+ public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ Runnable preProducerCheck,
+ Runnable postProducerCheck) {
+ super(id, configState, statusListener, initialState);
+ this.preProducerCheck = preProducerCheck;
+ this.postProducerCheck = postProducerCheck;
+ }
+
+ @Override
+ public WorkerTask doBuild(Task task,
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ TaskStatus.Listener statusListener,
+ TargetState initialState,
+ ConnectorConfig connectorConfig,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter,
+ ClassLoader classLoader,
+ ErrorHandlingMetrics errorHandlingMetrics,
+ Class<? extends Connector> connectorClass,
+ RetryWithToleranceOperator retryWithToleranceOperator) {
+
+ SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
+ connectorConfig.originalsStrings(), config.topicCreationEnable());
+ retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
+ TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
+ log.info("Initializing: {}", transformationChain);
+
+ Map<String, Object> producerProps = exactlyOnceSourceTaskProducerConfigs(
+ id, config, sourceConfig, connectorClass,
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
+ // Create a topic admin that the task will use for its offsets topic and, potentially, automatic topic creation
+ Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+ sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+ TopicAdmin topicAdmin = new TopicAdmin(adminOverrides);
+
+ Map<String, TopicCreationGroup> topicCreationGroups;
+ if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
+ topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
+ } else {
+ topicCreationGroups = null;
+ }
+
+ // Set up the offset backing store for this task instance
+ // (This logic is implemented in a follow-up pull request: https://github.com/apache/kafka/pull/11781)
+ ConnectorOffsetBackingStore offsetStore = new ConnectorOffsetBackingStore(globalOffsetBackingStore, "TODO");
+ offsetStore.configure(config);
+
+ CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter);
+ OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter);
+
+ // Note we pass the configState as it performs dynamic transformations under the covers
+ return new ExactlyOnceWorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
+ headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups,
+ offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, retryWithToleranceOperator,
+ herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck);
+ }
+ }
+
static class ConnectorStatusMetricsGroup {
private final ConnectMetrics connectMetrics;
private final ConnectMetricsRegistry registry;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 09b57fd42a..dfa13103fa 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
@@ -23,6 +24,8 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.apache.kafka.connect.source.SourceConnectorContext;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
@@ -74,7 +77,8 @@ public class WorkerConnector implements Runnable {
private volatile boolean cancelled; // indicates whether the Worker has cancelled the connector (e.g. because of slow shutdown)
private State state;
- private final OffsetStorageReader offsetStorageReader;
+ private final CloseableOffsetStorageReader offsetStorageReader;
+ private final ConnectorOffsetBackingStore offsetStore;
public WorkerConnector(String connName,
Connector connector,
@@ -82,7 +86,8 @@ public class WorkerConnector implements Runnable {
CloseableConnectorContext ctx,
ConnectMetrics metrics,
ConnectorStatus.Listener statusListener,
- OffsetStorageReader offsetStorageReader,
+ CloseableOffsetStorageReader offsetStorageReader,
+ ConnectorOffsetBackingStore offsetStore,
ClassLoader loader) {
this.connName = connName;
this.config = connectorConfig.originalsStrings();
@@ -93,6 +98,7 @@ public class WorkerConnector implements Runnable {
this.metrics = new ConnectorMetricsGroup(metrics, AbstractStatus.State.UNASSIGNED, statusListener);
this.statusListener = this.metrics;
this.offsetStorageReader = offsetStorageReader;
+ this.offsetStore = offsetStore;
this.pendingTargetStateChange = new AtomicReference<>();
this.pendingStateChangeCallback = new AtomicReference<>();
this.shutdownLatch = new CountDownLatch(1);
@@ -165,6 +171,7 @@ public class WorkerConnector implements Runnable {
SinkConnectorConfig.validate(config);
connector.initialize(new WorkerSinkConnectorContext());
} else {
+ Objects.requireNonNull(offsetStorageReader, "Offset reader cannot be null for source connectors");
connector.initialize(new WorkerSourceConnectorContext(offsetStorageReader));
}
} catch (Throwable t) {
@@ -271,8 +278,10 @@ public class WorkerConnector implements Runnable {
state = State.FAILED;
statusListener.onFailure(connName, t);
} finally {
- ctx.close();
- metrics.close();
+ Utils.closeQuietly(ctx, "connector context for " + connName);
+ Utils.closeQuietly(metrics, "connector metrics for " + connName);
+ Utils.closeQuietly(offsetStorageReader, "offset reader for " + connName);
+ Utils.closeQuietly(offsetStore::stop, "offset backing store for " + connName);
}
}
@@ -281,7 +290,9 @@ public class WorkerConnector implements Runnable {
// instance is being abandoned and we won't update the status on its behalf any more
// after this since a new instance may be started soon
statusListener.onShutdown(connName);
- ctx.close();
+ Utils.closeQuietly(ctx, "connector context for " + connName);
+ // Preemptively close the offset reader in case the connector is blocked on an offset read.
+ Utils.closeQuietly(offsetStorageReader, "offset reader for " + connName);
cancelled = true;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 43bf64c6af..0b8c7fa52e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -16,27 +16,11 @@
*/
package org.apache.kafka.connect.runtime;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.CumulativeSum;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.errors.RetriableException;
-import org.apache.kafka.connect.header.Header;
-import org.apache.kafka.connect.header.Headers;
-import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
-import org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
@@ -44,22 +28,19 @@ import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
-import org.apache.kafka.connect.util.TopicCreation;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Duration;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@@ -68,40 +49,16 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
-import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
/**
* WorkerTask that uses a SourceTask to ingest data into Kafka.
*/
-class WorkerSourceTask extends WorkerTask {
+class WorkerSourceTask extends AbstractWorkerSourceTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
- private static final long SEND_FAILED_BACKOFF_MS = 100;
-
- private final WorkerConfig workerConfig;
- private final SourceTask task;
- private final ClusterConfigState configState;
- private final Converter keyConverter;
- private final Converter valueConverter;
- private final HeaderConverter headerConverter;
- private final TransformationChain<SourceRecord> transformationChain;
- private final KafkaProducer<byte[], byte[]> producer;
- private final TopicAdmin admin;
- private final CloseableOffsetStorageReader offsetReader;
- private final OffsetStorageWriter offsetWriter;
- private final Executor closeExecutor;
- private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
- private final AtomicReference<Exception> producerSendException;
- private final boolean isTopicTrackingEnabled;
- private final TopicCreation topicCreation;
-
- private List<SourceRecord> toSend;
private volatile CommittableOffsets committableOffsets;
private final SubmittedRecords submittedRecords;
- private final CountDownLatch stopRequestedLatch;
-
- private Map<String, String> taskConfig;
- private boolean started = false;
+ private final AtomicReference<Exception> producerSendException;
public WorkerSourceTask(ConnectorTaskId id,
SourceTask task,
@@ -111,11 +68,12 @@ class WorkerSourceTask extends WorkerTask {
Converter valueConverter,
HeaderConverter headerConverter,
TransformationChain<SourceRecord> transformationChain,
- KafkaProducer<byte[], byte[]> producer,
+ Producer<byte[], byte[]> producer,
TopicAdmin admin,
Map<String, TopicCreationGroup> topicGroups,
CloseableOffsetStorageReader offsetReader,
OffsetStorageWriter offsetWriter,
+ ConnectorOffsetBackingStore offsetBackingStore,
WorkerConfig workerConfig,
ClusterConfigState configState,
ConnectMetrics connectMetrics,
@@ -125,355 +83,115 @@ class WorkerSourceTask extends WorkerTask {
StatusBackingStore statusBackingStore,
Executor closeExecutor) {
- super(id, statusListener, initialState, loader, connectMetrics,
- retryWithToleranceOperator, time, statusBackingStore);
-
- this.workerConfig = workerConfig;
- this.task = task;
- this.configState = configState;
- this.keyConverter = keyConverter;
- this.valueConverter = valueConverter;
- this.headerConverter = headerConverter;
- this.transformationChain = transformationChain;
- this.producer = producer;
- this.admin = admin;
- this.offsetReader = offsetReader;
- this.offsetWriter = offsetWriter;
- this.closeExecutor = closeExecutor;
-
- this.toSend = null;
+ super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain,
+ new WorkerSourceTaskContext(offsetReader, id, configState, null), producer,
+ admin, topicGroups, offsetReader, offsetWriter, offsetBackingStore, workerConfig, connectMetrics, loader,
+ time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
+
this.committableOffsets = CommittableOffsets.EMPTY;
this.submittedRecords = new SubmittedRecords();
- this.stopRequestedLatch = new CountDownLatch(1);
- this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
this.producerSendException = new AtomicReference<>();
- this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
- this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
}
@Override
- public void initialize(TaskConfig taskConfig) {
- try {
- this.taskConfig = taskConfig.originalsStrings();
- } catch (Throwable t) {
- log.error("{} Task failed initialization and will not be started.", this, t);
- onFailure(t);
- }
+ protected void prepareToInitializeTask() {
+ // No-op
}
@Override
- protected void close() {
- if (started) {
- try {
- task.stop();
- } catch (Throwable t) {
- log.warn("Could not stop task", t);
- }
- }
-
- closeProducer(Duration.ofSeconds(30));
-
- if (admin != null) {
- try {
- admin.close(Duration.ofSeconds(30));
- } catch (Throwable t) {
- log.warn("Failed to close admin client on time", t);
- }
- }
- Utils.closeQuietly(transformationChain, "transformation chain");
- Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+ protected void prepareToEnterSendLoop() {
+ // No-op
}
@Override
- public void removeMetrics() {
- try {
- sourceTaskMetricsGroup.close();
- } finally {
- super.removeMetrics();
- }
+ protected void beginSendIteration() {
+ updateCommittableOffsets();
}
@Override
- public void cancel() {
- super.cancel();
- offsetReader.close();
- // We proactively close the producer here as the main work thread for the task may
- // be blocked indefinitely in a call to Producer::send if automatic topic creation is
- // not enabled on either the connector or the Kafka cluster. Closing the producer should
- // unblock it in that case and allow shutdown to proceed normally.
- // With a duration of 0, the producer's own shutdown logic should be fairly quick,
- // but closing user-pluggable classes like interceptors may lag indefinitely. So, we
- // call close on a separate thread in order to avoid blocking the herder's tick thread.
- closeExecutor.execute(() -> closeProducer(Duration.ZERO));
+ protected void prepareToPollTask() {
+ maybeThrowProducerSendException();
}
@Override
- public void stop() {
- super.stop();
- stopRequestedLatch.countDown();
+ protected void recordDropped(SourceRecord record) {
+ commitTaskRecord(record, null);
}
@Override
- protected void initializeAndStart() {
- // If we try to start the task at all by invoking initialize, then count this as
- // "started" and expect a subsequent call to the task's stop() method
- // to properly clean up any resources allocated by its initialize() or
- // start() methods. If the task throws an exception during stop(),
- // the worst thing that happens is another exception gets logged for an already-
- // failed task
- started = true;
- task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
- task.start(taskConfig);
- log.info("{} Source task finished initialization and start", this);
+ protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(
+ SourceRecord sourceRecord,
+ ProducerRecord<byte[], byte[]> producerRecord
+ ) {
+ maybeThrowProducerSendException();
+ return Optional.of(submittedRecords.submit(sourceRecord));
}
@Override
- public void execute() {
- try {
- log.info("{} Executing source task", this);
- while (!isStopping()) {
- updateCommittableOffsets();
-
- if (shouldPause()) {
- onPause();
- if (awaitUnpause()) {
- onResume();
- }
- continue;
- }
-
- maybeThrowProducerSendException();
- if (toSend == null) {
- log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
- long start = time.milliseconds();
- toSend = poll();
- if (toSend != null) {
- recordPollReturned(toSend.size(), time.milliseconds() - start);
- }
- }
-
- if (toSend == null)
- continue;
- log.trace("{} About to send {} records to Kafka", this, toSend.size());
- if (!sendRecords())
- stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
- }
- } catch (InterruptedException e) {
- // Ignore and allow to exit.
- } finally {
- submittedRecords.awaitAllMessages(
- workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG),
- TimeUnit.MILLISECONDS
- );
- // It should still be safe to commit offsets since any exception would have
- // simply resulted in not getting more records but all the existing records should be ok to flush
- // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
- // to fail.
- updateCommittableOffsets();
- commitOffsets();
- }
+ protected void recordDispatched(SourceRecord record) {
+ // No-op
}
- private void closeProducer(Duration duration) {
- if (producer != null) {
- try {
- producer.close(duration);
- } catch (Throwable t) {
- log.warn("Could not close producer for {}", id, t);
- }
- }
- }
-
- private void maybeThrowProducerSendException() {
- if (producerSendException.get() != null) {
- throw new ConnectException(
- "Unrecoverable exception from producer send callback",
- producerSendException.get()
- );
- }
- }
-
- private void updateCommittableOffsets() {
- CommittableOffsets newOffsets = submittedRecords.committableOffsets();
- synchronized (this) {
- this.committableOffsets = this.committableOffsets.updatedWith(newOffsets);
- }
- }
-
- protected List<SourceRecord> poll() throws InterruptedException {
- try {
- return task.poll();
- } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
- log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
- // Do nothing. Let the framework poll whenever it's ready.
- return null;
- }
- }
-
- /**
- * Convert the source record into a producer record.
- *
- * @param record the transformed record
- * @return the producer record which can sent over to Kafka. A null is returned if the input is null or
- * if an error was encountered during any of the converter stages.
- */
- private ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord record) {
- if (record == null) {
- return null;
- }
-
- RecordHeaders headers = retryWithToleranceOperator.execute(() -> convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverter.getClass());
-
- byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key()),
- Stage.KEY_CONVERTER, keyConverter.getClass());
-
- byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(record.topic(), headers, record.valueSchema(), record.value()),
- Stage.VALUE_CONVERTER, valueConverter.getClass());
-
- if (retryWithToleranceOperator.failed()) {
- return null;
- }
-
- return new ProducerRecord<>(record.topic(), record.kafkaPartition(),
- ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers);
+ @Override
+ protected void batchDispatched() {
+ // No-op
}
- /**
- * Try to send a batch of records. If a send fails and is retriable, this saves the remainder of the batch so it can
- * be retried after backing off. If a send fails and is not retriable, this will throw a ConnectException.
- * @return true if all messages were sent, false if some need to be retried
- */
- private boolean sendRecords() {
- int processed = 0;
- recordBatch(toSend.size());
- final SourceRecordWriteCounter counter =
- toSend.size() > 0 ? new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup) : null;
- for (final SourceRecord preTransformRecord : toSend) {
- maybeThrowProducerSendException();
-
- retryWithToleranceOperator.sourceRecord(preTransformRecord);
- final SourceRecord record = transformationChain.apply(preTransformRecord);
- final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
- if (producerRecord == null || retryWithToleranceOperator.failed()) {
- counter.skipRecord();
- commitTaskRecord(preTransformRecord, null);
- continue;
- }
-
- log.trace("{} Appending record to the topic {} with key {}, value {}", this, record.topic(), record.key(), record.value());
- SubmittedRecord submittedRecord = submittedRecords.submit(record);
- try {
- maybeCreateTopic(record.topic());
- final String topic = producerRecord.topic();
- producer.send(
- producerRecord,
- (recordMetadata, e) -> {
- if (e != null) {
- if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
- log.trace("Ignoring failed record send: {} failed to send record to {}: ",
- WorkerSourceTask.this, topic, e);
- // executeFailed here allows the use of existing logging infrastructure/configuration
- retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
- preTransformRecord, e);
- commitTaskRecord(preTransformRecord, null);
- } else {
- log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
- log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
- producerSendException.compareAndSet(null, e);
- }
- } else {
- submittedRecord.ack();
- counter.completeRecord();
- log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
- WorkerSourceTask.this,
- recordMetadata.topic(), recordMetadata.partition(),
- recordMetadata.offset());
- commitTaskRecord(preTransformRecord, recordMetadata);
- if (isTopicTrackingEnabled) {
- recordActiveTopic(producerRecord.topic());
- }
- }
- });
- } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
- log.warn("{} Failed to send record to topic '{}' and partition '{}'. Backing off before retrying: ",
- this, producerRecord.topic(), producerRecord.partition(), e);
- toSend = toSend.subList(processed, toSend.size());
- submittedRecords.removeLastOccurrence(submittedRecord);
- counter.retryRemaining();
- return false;
- } catch (ConnectException e) {
- log.warn("{} Failed to send record to topic '{}' and partition '{}' due to an unrecoverable exception: ",
- this, producerRecord.topic(), producerRecord.partition(), e);
- log.trace("{} Failed to send {} with unrecoverable exception: ", this, producerRecord, e);
- throw e;
- } catch (KafkaException e) {
- throw new ConnectException("Unrecoverable exception trying to send", e);
- }
- processed++;
- }
- toSend = null;
- return true;
+ @Override
+ protected void recordSent(
+ SourceRecord sourceRecord,
+ ProducerRecord<byte[], byte[]> producerRecord,
+ RecordMetadata recordMetadata
+ ) {
+ commitTaskRecord(sourceRecord, recordMetadata);
}
- // Due to transformations that may change the destination topic of a record (such as
- // RegexRouter) topic creation can not be batched for multiple topics
- private void maybeCreateTopic(String topic) {
- if (!topicCreation.isTopicCreationRequired(topic)) {
- log.trace("Topic creation by the connector is disabled or the topic {} was previously created." +
- "If auto.create.topics.enable is enabled on the broker, " +
- "the topic will be created with default settings", topic);
- return;
- }
- log.info("The task will send records to topic '{}' for the first time. Checking "
- + "whether topic exists", topic);
- Map<String, TopicDescription> existing = admin.describeTopics(topic);
- if (!existing.isEmpty()) {
- log.info("Topic '{}' already exists.", topic);
- topicCreation.addTopic(topic);
- return;
- }
-
- log.info("Creating topic '{}'", topic);
- TopicCreationGroup topicGroup = topicCreation.findFirstGroup(topic);
- log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);
- NewTopic newTopic = topicGroup.newTopic(topic);
-
- TopicAdmin.TopicCreationResponse response = admin.createOrFindTopics(newTopic);
- if (response.isCreated(newTopic.name())) {
- topicCreation.addTopic(topic);
- log.info("Created topic '{}' using creation group {}", newTopic, topicGroup);
- } else if (response.isExisting(newTopic.name())) {
- topicCreation.addTopic(topic);
- log.info("Found existing topic '{}'", newTopic);
+ @Override
+ protected void producerSendFailed(
+ boolean synchronous,
+ ProducerRecord<byte[], byte[]> producerRecord,
+ SourceRecord preTransformRecord,
+ Exception e
+ ) {
+ if (synchronous) {
+ throw new ConnectException("Unrecoverable exception trying to send", e);
+ }
+
+ String topic = producerRecord.topic();
+ if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
+ log.trace(
+ "Ignoring failed record send: {} failed to send record to {}: ",
+ WorkerSourceTask.this,
+ topic,
+ e
+ );
+ // executeFailed here allows the use of existing logging infrastructure/configuration
+ retryWithToleranceOperator.executeFailed(
+ Stage.KAFKA_PRODUCE,
+ WorkerSourceTask.class,
+ preTransformRecord,
+ e
+ );
+ commitTaskRecord(preTransformRecord, null);
} else {
- // The topic still does not exist and could not be created, so treat it as a task failure
- log.warn("Request to create new topic '{}' failed", topic);
- throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure "
- + "that the task is authorized to create topics or that the topic exists and "
- + "restart the task");
- }
- }
-
- private RecordHeaders convertHeaderFor(SourceRecord record) {
- Headers headers = record.headers();
- RecordHeaders result = new RecordHeaders();
- if (headers != null) {
- String topic = record.topic();
- for (Header header : headers) {
- String key = header.key();
- byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
- result.add(key, rawHeader);
- }
+ log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+ log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+ producerSendException.compareAndSet(null, e);
}
- return result;
}
- private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
- try {
- task.commitRecord(record, metadata);
- } catch (Throwable t) {
- log.error("{} Exception thrown while calling task.commitRecord()", this, t);
- }
+ @Override
+ protected void finalOffsetCommit(boolean failed) {
+ // It should still be safe to commit offsets since any exception would have
+ // simply resulted in not getting more records but all the existing records should be ok to flush
+ // and commit offsets. Worst case, task.commit() will also throw an exception causing the offset
+ // commit to fail.
+ submittedRecords.awaitAllMessages(
+ workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG),
+ TimeUnit.MILLISECONDS
+ );
+ updateCommittableOffsets();
+ commitOffsets();
}
public boolean commitOffsets() {
@@ -510,8 +228,8 @@ class WorkerSourceTask extends WorkerTask {
committableOffsets.largestDequeSize()
);
} else {
- log.debug("{} There are currently no pending messages for this offset commit; "
- + "all messages dispatched to the task's producer since the last commit have been acknowledged",
+ log.debug("{} There are currently no pending messages for this offset commit; "
+ + "all messages dispatched to the task's producer since the last commit have been acknowledged",
this
);
}
@@ -582,11 +300,19 @@ class WorkerSourceTask extends WorkerTask {
return true;
}
- private void commitSourceTask() {
- try {
- this.task.commit();
- } catch (Throwable t) {
- log.error("{} Exception thrown while calling task.commit()", this, t);
+ private void updateCommittableOffsets() {
+ CommittableOffsets newOffsets = submittedRecords.committableOffsets();
+ synchronized (this) {
+ this.committableOffsets = this.committableOffsets.updatedWith(newOffsets);
+ }
+ }
+
+ private void maybeThrowProducerSendException() {
+ if (producerSendException.get() != null) {
+ throw new ConnectException(
+ "Unrecoverable exception from producer send callback",
+ producerSendException.get()
+ );
}
}
@@ -597,101 +323,4 @@ class WorkerSourceTask extends WorkerTask {
'}';
}
- protected void recordPollReturned(int numRecordsInBatch, long duration) {
- sourceTaskMetricsGroup.recordPoll(numRecordsInBatch, duration);
- }
-
- SourceTaskMetricsGroup sourceTaskMetricsGroup() {
- return sourceTaskMetricsGroup;
- }
-
- static class SourceRecordWriteCounter {
- private final SourceTaskMetricsGroup metricsGroup;
- private final int batchSize;
- private boolean completed = false;
- private int counter;
- public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) {
- assert batchSize > 0;
- assert metricsGroup != null;
- this.batchSize = batchSize;
- counter = batchSize;
- this.metricsGroup = metricsGroup;
- }
- public void skipRecord() {
- if (counter > 0 && --counter == 0) {
- finishedAllWrites();
- }
- }
- public void completeRecord() {
- if (counter > 0 && --counter == 0) {
- finishedAllWrites();
- }
- }
- public void retryRemaining() {
- finishedAllWrites();
- }
- private void finishedAllWrites() {
- if (!completed) {
- metricsGroup.recordWrite(batchSize - counter);
- completed = true;
- }
- }
- }
-
- static class SourceTaskMetricsGroup {
- private final MetricGroup metricGroup;
- private final Sensor sourceRecordPoll;
- private final Sensor sourceRecordWrite;
- private final Sensor sourceRecordActiveCount;
- private final Sensor pollTime;
- private int activeRecordCount;
-
- public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
- ConnectMetricsRegistry registry = connectMetrics.registry();
- metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
- registry.connectorTagName(), id.connector(),
- registry.taskTagName(), Integer.toString(id.task()));
- // remove any previously created metrics in this group to prevent collisions.
- metricGroup.close();
-
- sourceRecordPoll = metricGroup.sensor("source-record-poll");
- sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
- sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal), new CumulativeSum());
-
- sourceRecordWrite = metricGroup.sensor("source-record-write");
- sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteRate), new Rate());
- sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal), new CumulativeSum());
-
- pollTime = metricGroup.sensor("poll-batch-time");
- pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());
- pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new Avg());
-
- sourceRecordActiveCount = metricGroup.sensor("source-record-active-count");
- sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCount), new Value());
- sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountMax), new Max());
- sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountAvg), new Avg());
- }
-
- void close() {
- metricGroup.close();
- }
-
- void recordPoll(int batchSize, long duration) {
- sourceRecordPoll.record(batchSize);
- pollTime.record(duration);
- activeRecordCount += batchSize;
- sourceRecordActiveCount.record(activeRecordCount);
- }
-
- void recordWrite(int recordCount) {
- sourceRecordWrite.record(recordCount);
- activeRecordCount -= recordCount;
- activeRecordCount = Math.max(0, activeRecordCount);
- sourceRecordActiveCount.record(activeRecordCount);
- }
-
- protected MetricGroup metricGroup() {
- return metricGroup;
- }
- }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
index 5c1451fff1..d58e98e057 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
@@ -19,30 +19,39 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.kafka.connect.util.ConnectorTaskId;
import java.util.Map;
public class WorkerSourceTaskContext implements SourceTaskContext {
private final OffsetStorageReader reader;
- private final WorkerSourceTask task;
+ private final ConnectorTaskId id;
private final ClusterConfigState configState;
+ private final WorkerTransactionContext transactionContext;
public WorkerSourceTaskContext(OffsetStorageReader reader,
- WorkerSourceTask task,
- ClusterConfigState configState) {
+ ConnectorTaskId id,
+ ClusterConfigState configState,
+ WorkerTransactionContext transactionContext) {
this.reader = reader;
- this.task = task;
+ this.id = id;
this.configState = configState;
+ this.transactionContext = transactionContext;
}
@Override
public Map<String, String> configs() {
- return configState.taskConfig(task.id());
+ return configState.taskConfig(id);
}
@Override
public OffsetStorageReader offsetStorageReader() {
return reader;
}
+
+ @Override
+ public WorkerTransactionContext transactionContext() {
+ return transactionContext;
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 0d893f56ee..072e4b34a1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -55,7 +55,7 @@ abstract class WorkerTask implements Runnable {
protected final ConnectorTaskId id;
private final TaskStatus.Listener statusListener;
protected final ClassLoader loader;
- protected final StatusBackingStore statusBackingStore;
+ private final StatusBackingStore statusBackingStore;
protected final Time time;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final TaskMetricsGroup taskMetricsGroup;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java
new file mode 100644
index 0000000000..fde3f6944f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.TransactionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A {@link TransactionContext} that can be given to tasks and then queried by the worker to check on
+ * requests to abort and commit transactions. This class is thread safe and is designed to accommodate
+ * concurrent use without external synchronization.
+ */
+public class WorkerTransactionContext implements TransactionContext {
+
+ private static final Logger log = LoggerFactory.getLogger(WorkerTransactionContext.class);
+
+ private final Set<SourceRecord> commitableRecords = new HashSet<>();
+ private final Set<SourceRecord> abortableRecords = new HashSet<>();
+ private boolean batchCommitRequested = false;
+ private boolean batchAbortRequested = false;
+
+ @Override
+ public synchronized void commitTransaction() {
+ batchCommitRequested = true;
+ }
+
+ @Override
+ public synchronized void commitTransaction(SourceRecord record) {
+ Objects.requireNonNull(record, "Source record used to define transaction boundaries may not be null");
+ commitableRecords.add(record);
+ }
+
+ @Override
+ public synchronized void abortTransaction() {
+ batchAbortRequested = true;
+ }
+
+ @Override
+ public synchronized void abortTransaction(SourceRecord record) {
+ Objects.requireNonNull(record, "Source record used to define transaction boundaries may not be null");
+ abortableRecords.add(record);
+ }
+
+ public synchronized boolean shouldCommitBatch() {
+ checkBatchRequestsConsistency();
+ boolean result = batchCommitRequested;
+ batchCommitRequested = false;
+ return result;
+ }
+
+ public synchronized boolean shouldAbortBatch() {
+ checkBatchRequestsConsistency();
+ boolean result = batchAbortRequested;
+ batchAbortRequested = false;
+ return result;
+ }
+
+ public synchronized boolean shouldCommitOn(SourceRecord record) {
+ // We could perform this check in the connector-facing methods (such as commitTransaction(SourceRecord)),
+ // but the connector might swallow that exception.
+ // This way, we can fail the task unconditionally, which is warranted since the alternative may lead to data loss.
+ // Essentially, instead of telling the task that it screwed up and trusting it to do the right thing, we rat on it to the
+ // worker and let it get punished accordingly.
+ checkRecordRequestConsistency(record);
+ return commitableRecords.remove(record);
+ }
+
+ public synchronized boolean shouldAbortOn(SourceRecord record) {
+ checkRecordRequestConsistency(record);
+ return abortableRecords.remove(record);
+ }
+
+ private void checkBatchRequestsConsistency() {
+ if (batchCommitRequested && batchAbortRequested) {
+ throw new IllegalStateException("Connector requested both commit and abort of same transaction");
+ }
+ }
+
+ private void checkRecordRequestConsistency(SourceRecord record) {
+ if (commitableRecords.contains(record) && abortableRecords.contains(record)) {
+ log.trace("Connector will fail as it has requested both commit and abort of transaction for same record: {}", record);
+ throw new IllegalStateException(String.format(
+ "Connector requested both commit and abort of same record against topic/partition %s/%s",
+ record.topic(), record.kafkaPartition()
+ ));
+ }
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 8d269a3649..ded833da59 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1699,14 +1699,56 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private boolean startTask(ConnectorTaskId taskId) {
log.info("Starting task {}", taskId);
- return worker.startTask(
- taskId,
- configState,
- configState.connectorConfig(taskId.connector()),
- configState.taskConfig(taskId),
- this,
- configState.targetState(taskId.connector())
- );
+ Map<String, String> connProps = configState.connectorConfig(taskId.connector());
+ switch (connectorTypeForConfig(connProps)) {
+ case SINK:
+ return worker.startSinkTask(
+ taskId,
+ configState,
+ connProps,
+ configState.taskConfig(taskId),
+ this,
+ configState.targetState(taskId.connector())
+ );
+ case SOURCE:
+ if (config.exactlyOnceSourceEnabled()) {
+ int taskGeneration = configState.taskConfigGeneration(taskId.connector());
+ return worker.startExactlyOnceSourceTask(
+ taskId,
+ configState,
+ connProps,
+ configState.taskConfig(taskId),
+ this,
+ configState.targetState(taskId.connector()),
+ () -> {
+ FutureCallback<Void> preflightFencing = new FutureCallback<>();
+ fenceZombieSourceTasks(taskId, preflightFencing);
+ try {
+ preflightFencing.get();
+ } catch (InterruptedException e) {
+ throw new ConnectException("Interrupted while attempting to perform round of zombie fencing", e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ throw ConnectUtils.maybeWrap(cause, "Failed to perform round of zombie fencing");
+ }
+ },
+ () -> {
+ verifyTaskGenerationAndOwnership(taskId, taskGeneration);
+ }
+ );
+ } else {
+ return worker.startSourceTask(
+ taskId,
+ configState,
+ connProps,
+ configState.taskConfig(taskId),
+ this,
+ configState.targetState(taskId.connector())
+ );
+ }
+ default:
+ throw new ConnectException("Failed to start task " + taskId + " since it is not a recognizable type (source or sink)");
+ }
}
private Callable<Void> getTaskStartingCallable(final ConnectorTaskId taskId) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index f9e812610c..8afe3c7b3d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -280,9 +280,8 @@ public class StandaloneHerder extends AbstractHerder {
cb.onCompletion(new NotFoundException("Task " + taskId + " not found", null), null);
Map<String, String> connConfigProps = configState.connectorConfig(taskId.connector());
- TargetState targetState = configState.targetState(taskId.connector());
worker.stopAndAwaitTask(taskId);
- if (worker.startTask(taskId, configState, connConfigProps, taskConfigProps, this, targetState))
+ if (startTask(taskId, connConfigProps))
cb.onCompletion(null, null);
else
cb.onCompletion(new ConnectException("Failed to start task: " + taskId), null);
@@ -377,11 +376,34 @@ public class StandaloneHerder extends AbstractHerder {
}
private void createConnectorTasks(String connName, Collection<ConnectorTaskId> taskIds) {
- TargetState initialState = configState.targetState(connName);
Map<String, String> connConfigs = configState.connectorConfig(connName);
for (ConnectorTaskId taskId : taskIds) {
- Map<String, String> taskConfigMap = configState.taskConfig(taskId);
- worker.startTask(taskId, configState, connConfigs, taskConfigMap, this, initialState);
+ startTask(taskId, connConfigs);
+ }
+ }
+
+ private boolean startTask(ConnectorTaskId taskId, Map<String, String> connProps) {
+ switch (connectorTypeForClass(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))) {
+ case SINK:
+ return worker.startSinkTask(
+ taskId,
+ configState,
+ connProps,
+ configState.taskConfig(taskId),
+ this,
+ configState.targetState(taskId.connector())
+ );
+ case SOURCE:
+ return worker.startSourceTask(
+ taskId,
+ configState,
+ connProps,
+ configState.taskConfig(taskId),
+ this,
+ configState.targetState(taskId.connector())
+ );
+ default:
+ throw new ConnectException("Failed to start task " + taskId + " since it is not a recognizable type (source or sink)");
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
new file mode 100644
index 0000000000..8f0f068bc0
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class ConnectorOffsetBackingStore implements OffsetBackingStore {
+
+ private final OffsetBackingStore workerStore;
+ private final String primaryOffsetsTopic;
+
+ public ConnectorOffsetBackingStore(
+ OffsetBackingStore workerStore,
+ String primaryOffsetsTopic
+ ) {
+ this.workerStore = workerStore;
+ this.primaryOffsetsTopic = primaryOffsetsTopic;
+ }
+
+ public String primaryOffsetsTopic() {
+ return primaryOffsetsTopic;
+ }
+
+ @Override
+ public void start() {
+ // TODO
+ }
+
+ @Override
+ public void stop() {
+ // TODO
+ }
+
+ @Override
+ public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) {
+ // TODO
+ return workerStore.get(keys);
+ }
+
+ @Override
+ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback) {
+ // TODO
+ return workerStore.set(values, callback);
+ }
+
+ @Override
+ public void configure(WorkerConfig config) {
+ // TODO
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
index a1eea43103..49b9d69d8e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
@@ -141,6 +141,7 @@ public class OffsetStorageReaderImpl implements CloseableOffsetStorageReader {
return result;
}
+ @Override
public void close() {
if (!closed.getAndSet(true)) {
synchronized (offsetReadFutures) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index 7766e2cf5d..b67e3d7b1b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -89,8 +89,9 @@ public class OffsetStorageWriter {
* @param partition the partition to store an offset for
* @param offset the offset
*/
- public synchronized void offset(Map<String, Object> partition, Map<String, Object> offset) {
- data.put(partition, offset);
+ @SuppressWarnings("unchecked")
+ public synchronized void offset(Map<String, ?> partition, Map<String, ?> offset) {
+ data.put((Map<String, Object>) partition, (Map<String, Object>) offset);
}
private boolean flushing() {
@@ -113,12 +114,18 @@ public class OffsetStorageWriter {
if (data.isEmpty())
return false;
- assert !flushing();
toFlush = data;
data = new HashMap<>();
return true;
}
+ /**
+ * @return whether there's anything to flush right now.
+ */
+ public synchronized boolean willFlush() {
+ return !data.isEmpty();
+ }
+
/**
* Flush the current offsets and clear them from this writer. This is non-blocking: it
* moves the current set of offsets out of the way, serializes the data, and asynchronously
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
new file mode 100644
index 0000000000..d0833dbffc
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -0,0 +1,842 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.integration.MonitorableSourceConnector;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+@PowerMockIgnore({"javax.management.*",
+ "org.apache.log4j.*"})
+@RunWith(PowerMockRunner.class)
+public class AbstractWorkerSourceTaskTest {
+
+ private static final String TOPIC = "topic";
+ private static final String OTHER_TOPIC = "other-topic";
+ private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
+ private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
+
+ // Connect-format data
+ private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+ private static final Integer KEY = -1;
+ private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
+ private static final Long RECORD = 12L;
+ // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version
+ // is used in the right place.
+ private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
+ private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
+
+ @Mock private SourceTask sourceTask;
+ @Mock private TopicAdmin admin;
+ @Mock private KafkaProducer<byte[], byte[]> producer;
+ @Mock private Converter keyConverter;
+ @Mock private Converter valueConverter;
+ @Mock private HeaderConverter headerConverter;
+ @Mock private TransformationChain<SourceRecord> transformationChain;
+ @Mock private CloseableOffsetStorageReader offsetReader;
+ @Mock private OffsetStorageWriter offsetWriter;
+ @Mock private ConnectorOffsetBackingStore offsetStore;
+ @Mock private StatusBackingStore statusBackingStore;
+ @Mock private WorkerSourceTaskContext sourceTaskContext;
+ @MockStrict private TaskStatus.Listener statusListener;
+
+ private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ private final ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
+
+ private Plugins plugins;
+ private WorkerConfig config;
+ private SourceConnectorConfig sourceConfig;
+ private MockConnectMetrics metrics = new MockConnectMetrics();
+ private Capture<Callback> producerCallbacks;
+
+ private AbstractWorkerSourceTask workerTask;
+
+ @Before
+ public void setup() {
+ Map<String, String> workerProps = workerProps();
+ plugins = new Plugins(workerProps);
+ config = new StandaloneConfig(workerProps);
+ sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorPropsWithGroups(TOPIC), true);
+ producerCallbacks = EasyMock.newCapture();
+ metrics = new MockConnectMetrics();
+ }
+
+ private Map<String, String> workerProps() {
+ Map<String, String> props = new HashMap<>();
+ props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ props.put(TOPIC_CREATION_ENABLE_CONFIG, "true");
+ return props;
+ }
+
+ private Map<String, String> sourceConnectorPropsWithGroups(String topic) {
+ // setup up props for the source connector
+ Map<String, String> props = new HashMap<>();
+ props.put("name", "foo-connector");
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+ props.put(TASKS_MAX_CONFIG, String.valueOf(1));
+ props.put(TOPIC_CONFIG, topic);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
+ props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
+ props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
+ props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, topic);
+ props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + INCLUDE_REGEX_CONFIG, ".*");
+ props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + EXCLUDE_REGEX_CONFIG, topic);
+ return props;
+ }
+
+ @After
+ public void tearDown() {
+ if (metrics != null) metrics.stop();
+ }
+
+ @Test
+ public void testMetricsGroup() {
+ AbstractWorkerSourceTask.SourceTaskMetricsGroup group = new AbstractWorkerSourceTask.SourceTaskMetricsGroup(taskId, metrics);
+ AbstractWorkerSourceTask.SourceTaskMetricsGroup group1 = new AbstractWorkerSourceTask.SourceTaskMetricsGroup(taskId1, metrics);
+ for (int i = 0; i != 10; ++i) {
+ group.recordPoll(100, 1000 + i * 100);
+ group.recordWrite(10);
+ }
+ for (int i = 0; i != 20; ++i) {
+ group1.recordPoll(100, 1000 + i * 100);
+ group1.recordWrite(10);
+ }
+ assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+ assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+ assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d);
+ assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d);
+ assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
+ assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
+ assertEquals(900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-active-count"), 0.001d);
+
+ // Close the group
+ group.close();
+
+ for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
+ // Metrics for this group should no longer exist
+ assertFalse(group.metricGroup().groupId().includes(metricName));
+ }
+ // Sensors for this group should no longer exist
+ assertNull(group.metricGroup().metrics().getSensor("sink-record-read"));
+ assertNull(group.metricGroup().metrics().getSensor("sink-record-send"));
+ assertNull(group.metricGroup().metrics().getSensor("sink-record-active-count"));
+ assertNull(group.metricGroup().metrics().getSensor("partition-count"));
+ assertNull(group.metricGroup().metrics().getSensor("offset-seq-number"));
+ assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion"));
+ assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
+ assertNull(group.metricGroup().metrics().getSensor("put-batch-time"));
+
+ assertEquals(2900.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
+ assertEquals(1950.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
+ assertEquals(66.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-rate"), 0.001d);
+ assertEquals(2000, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-total"), 0.001d);
+ assertEquals(6.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d);
+ assertEquals(200, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d);
+ assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d);
+ }
+
+ @Test
+ public void testSendRecordsConvertsData() {
+ createWorkerTask();
+
+ List<SourceRecord> records = new ArrayList<>();
+ // Can just use the same record for key and value
+ records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
+
+ Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+ expectTopicCreation(TOPIC);
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = records;
+ workerTask.sendRecords();
+ assertEquals(SERIALIZED_KEY, sent.getValue().key());
+ assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSendRecordsPropagatesTimestamp() {
+ final Long timestamp = System.currentTimeMillis();
+
+ createWorkerTask();
+
+ List<SourceRecord> records = Collections.singletonList(
+ new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+ );
+
+ Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+ expectTopicCreation(TOPIC);
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = records;
+ workerTask.sendRecords();
+ assertEquals(timestamp, sent.getValue().timestamp());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSendRecordsCorruptTimestamp() {
+ final Long timestamp = -3L;
+ createWorkerTask();
+
+ List<SourceRecord> records = Collections.singletonList(
+ new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+ );
+
+ Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = records;
+ assertThrows(InvalidRecordException.class, workerTask::sendRecords);
+ assertFalse(sent.hasCaptured());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSendRecordsNoTimestamp() {
+ final Long timestamp = -1L;
+ createWorkerTask();
+
+ List<SourceRecord> records = Collections.singletonList(
+ new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+ );
+
+ Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+ expectTopicCreation(TOPIC);
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = records;
+ workerTask.sendRecords();
+ assertNull(sent.getValue().timestamp());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testHeaders() {
+ Headers headers = new RecordHeaders();
+ headers.add("header_key", "header_value".getBytes());
+
+ org.apache.kafka.connect.header.Headers connectHeaders = new ConnectHeaders();
+ connectHeaders.add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
+
+ createWorkerTask();
+
+ List<SourceRecord> records = new ArrayList<>();
+ records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders));
+
+ expectTopicCreation(TOPIC);
+
+ Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, true, headers);
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = records;
+ workerTask.sendRecords();
+ assertEquals(SERIALIZED_KEY, sent.getValue().key());
+ assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+ assertEquals(headers, sent.getValue().headers());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testHeadersWithCustomConverter() throws Exception {
+ StringConverter stringConverter = new StringConverter();
+ SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders();
+
+ createWorkerTask(stringConverter, testConverter, stringConverter);
+
+ List<SourceRecord> records = new ArrayList<>();
+
+ String stringA = "Árvíztűrő tükörfúrógép";
+ org.apache.kafka.connect.header.Headers headersA = new ConnectHeaders();
+ String encodingA = "latin2";
+ headersA.addString("encoding", encodingA);
+
+ records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, stringA, null, headersA));
+
+ String stringB = "Тестовое сообщение";
+ org.apache.kafka.connect.header.Headers headersB = new ConnectHeaders();
+ String encodingB = "koi8_r";
+ headersB.addString("encoding", encodingB);
+
+ records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, stringB, null, headersB));
+
+ expectTopicCreation(TOPIC);
+
+ Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(TOPIC, false, null);
+ Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(TOPIC, false, null);
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = records;
+ workerTask.sendRecords();
+
+ assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap(sentRecordA.getValue().key()));
+ assertEquals(
+ ByteBuffer.wrap(stringA.getBytes(encodingA)),
+ ByteBuffer.wrap(sentRecordA.getValue().value())
+ );
+ assertEquals(encodingA, new String(sentRecordA.getValue().headers().lastHeader("encoding").value()));
+
+ assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap(sentRecordB.getValue().key()));
+ assertEquals(
+ ByteBuffer.wrap(stringB.getBytes(encodingB)),
+ ByteBuffer.wrap(sentRecordB.getValue().value())
+ );
+ assertEquals(encodingB, new String(sentRecordB.getValue().headers().lastHeader("encoding").value()));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testTopicCreateWhenTopicExists() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectPreliminaryCalls();
+ TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
+ TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
+ EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc));
+
+ expectSendRecord();
+ expectSendRecord();
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = Arrays.asList(record1, record2);
+ workerTask.sendRecords();
+ }
+
+ @Test
+ public void testSendRecordsTopicDescribeRetries() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectPreliminaryCalls();
+ // First round - call to describe the topic times out
+ EasyMock.expect(admin.describeTopics(TOPIC))
+ .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+ // Second round - calls to describe and create succeed
+ expectTopicCreation(TOPIC);
+ // Exactly two records are sent
+ expectSendRecord();
+ expectSendRecord();
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = Arrays.asList(record1, record2);
+ workerTask.sendRecords();
+ assertEquals(Arrays.asList(record1, record2), workerTask.toSend);
+
+ // Next they all succeed
+ workerTask.sendRecords();
+ assertNull(workerTask.toSend);
+ }
+
+ @Test
+ public void testSendRecordsTopicCreateRetries() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ // First call to describe the topic times out
+ expectPreliminaryCalls();
+ EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+ Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+ EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
+ .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+ // Second round
+ expectTopicCreation(TOPIC);
+ expectSendRecord();
+ expectSendRecord();
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = Arrays.asList(record1, record2);
+ workerTask.sendRecords();
+ assertEquals(Arrays.asList(record1, record2), workerTask.toSend);
+
+ // Next they all succeed
+ workerTask.sendRecords();
+ assertNull(workerTask.toSend);
+ }
+
+ @Test
+ public void testSendRecordsTopicDescribeRetriesMidway() {
+ createWorkerTask();
+
+ // Differentiate only by Kafka partition so we can reuse conversion expectations
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ // First round
+ expectPreliminaryCalls(OTHER_TOPIC);
+ expectTopicCreation(TOPIC);
+ expectSendRecord();
+ expectSendRecord();
+
+ // First call to describe the topic times out
+ EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
+ .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+ // Second round
+ expectTopicCreation(OTHER_TOPIC);
+ expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
+
+ PowerMock.replayAll();
+
+ // Try to send 3, make first pass, second fail. Should save last two
+ workerTask.toSend = Arrays.asList(record1, record2, record3);
+ workerTask.sendRecords();
+ assertEquals(Arrays.asList(record3), workerTask.toSend);
+
+ // Next they all succeed
+ workerTask.sendRecords();
+ assertNull(workerTask.toSend);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSendRecordsTopicCreateRetriesMidway() {
+ createWorkerTask();
+
+ // Differentiate only by Kafka partition so we can reuse conversion expectations
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ // First round
+ expectPreliminaryCalls(OTHER_TOPIC);
+ expectTopicCreation(TOPIC);
+ expectSendRecord();
+ expectSendRecord();
+
+ EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
+ // First call to create the topic times out
+ Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+ EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
+ .andThrow(new RetriableException(new TimeoutException("timeout")));
+
+ // Second round
+ expectTopicCreation(OTHER_TOPIC);
+ expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
+
+ PowerMock.replayAll();
+
+ // Try to send 3, make first pass, second fail. Should save last two
+ workerTask.toSend = Arrays.asList(record1, record2, record3);
+ workerTask.sendRecords();
+ assertEquals(Arrays.asList(record3), workerTask.toSend);
+
+ // Next they all succeed
+ workerTask.sendRecords();
+ assertNull(workerTask.toSend);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testTopicDescribeFails() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectPreliminaryCalls();
+ EasyMock.expect(admin.describeTopics(TOPIC))
+ .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = Arrays.asList(record1, record2);
+ assertThrows(ConnectException.class, workerTask::sendRecords);
+ }
+
+ @Test
+ public void testTopicCreateFails() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectPreliminaryCalls();
+ EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+ Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+ EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
+ .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = Arrays.asList(record1, record2);
+ assertThrows(ConnectException.class, workerTask::sendRecords);
+ assertTrue(newTopicCapture.hasCaptured());
+ }
+
+ @Test
+ public void testTopicCreateFailsWithExceptionWhenCreateReturnsTopicNotCreatedOrFound() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectPreliminaryCalls();
+ EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+ Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+ EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(TopicAdmin.EMPTY_CREATION);
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = Arrays.asList(record1, record2);
+ assertThrows(ConnectException.class, workerTask::sendRecords);
+ assertTrue(newTopicCapture.hasCaptured());
+ }
+
+ @Test
+ public void testTopicCreateSucceedsWhenCreateReturnsExistingTopicFound() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectPreliminaryCalls();
+ EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+ Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+ EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(foundTopic(TOPIC));
+
+ expectSendRecord();
+ expectSendRecord();
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = Arrays.asList(record1, record2);
+ workerTask.sendRecords();
+ }
+
+ @Test
+ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectPreliminaryCalls();
+ EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+ Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+ EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
+
+ expectSendRecord();
+ expectSendRecord();
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = Arrays.asList(record1, record2);
+ workerTask.sendRecords();
+ }
+
+ private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
+ String topic,
+ boolean anyTimes,
+ Headers headers
+ ) {
+ if (headers != null)
+ expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+
+ expectApplyTransformationChain(anyTimes);
+
+ Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
+
+ IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
+ producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks)));
+
+ IAnswer<Future<RecordMetadata>> expectResponse = () -> {
+ synchronized (producerCallbacks) {
+ for (Callback cb : producerCallbacks.getValues()) {
+ cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
+ }
+ producerCallbacks.reset();
+ }
+ return null;
+ };
+
+ if (anyTimes)
+ expect.andStubAnswer(expectResponse);
+ else
+ expect.andAnswer(expectResponse);
+
+ expectTaskGetTopic(anyTimes);
+
+ return sent;
+ }
+
+ private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
+ return expectSendRecord(TOPIC, true, emptyHeaders());
+ }
+
+ private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
+ return expectSendRecord(TOPIC, false, emptyHeaders());
+ }
+
+ private void expectTaskGetTopic(boolean anyTimes) {
+ final Capture<String> connectorCapture = EasyMock.newCapture();
+ final Capture<String> topicCapture = EasyMock.newCapture();
+ IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
+ EasyMock.capture(connectorCapture),
+ EasyMock.capture(topicCapture)));
+ if (anyTimes) {
+ expect.andStubAnswer(() -> new TopicStatus(
+ topicCapture.getValue(),
+ new ConnectorTaskId(connectorCapture.getValue(), 0),
+ Time.SYSTEM.milliseconds()));
+ } else {
+ expect.andAnswer(() -> new TopicStatus(
+ topicCapture.getValue(),
+ new ConnectorTaskId(connectorCapture.getValue(), 0),
+ Time.SYSTEM.milliseconds()));
+ }
+ if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
+ assertEquals("job", connectorCapture.getValue());
+ assertEquals(TOPIC, topicCapture.getValue());
+ }
+ }
+
+ private void expectTopicCreation(String topic) {
+ if (config.topicCreationEnable()) {
+ EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
+ Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+ EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
+ }
+ }
+
+ private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
+ Set<String> created = Collections.singleton(topic);
+ Set<String> existing = Collections.emptySet();
+ return new TopicAdmin.TopicCreationResponse(created, existing);
+ }
+
+ private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
+ Set<String> created = Collections.emptySet();
+ Set<String> existing = Collections.singleton(topic);
+ return new TopicAdmin.TopicCreationResponse(created, existing);
+ }
+
+ private void expectPreliminaryCalls() {
+ expectPreliminaryCalls(TOPIC);
+ }
+
+ private void expectPreliminaryCalls(String topic) {
+ expectConvertHeadersAndKeyValue(topic, true, emptyHeaders());
+ expectApplyTransformationChain(false);
+ PowerMock.expectLastCall();
+ }
+
+ private void expectConvertHeadersAndKeyValue(String topic, boolean anyTimes, Headers headers) {
+ for (Header header : headers) {
+ IExpectationSetters<byte[]> convertHeaderExpect = EasyMock.expect(headerConverter.fromConnectHeader(topic, header.key(), Schema.STRING_SCHEMA, new String(header.value())));
+ if (anyTimes)
+ convertHeaderExpect.andStubReturn(header.value());
+ else
+ convertHeaderExpect.andReturn(header.value());
+ }
+ IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY));
+ if (anyTimes)
+ convertKeyExpect.andStubReturn(SERIALIZED_KEY);
+ else
+ convertKeyExpect.andReturn(SERIALIZED_KEY);
+ IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD));
+ if (anyTimes)
+ convertValueExpect.andStubReturn(SERIALIZED_RECORD);
+ else
+ convertValueExpect.andReturn(SERIALIZED_RECORD);
+ }
+
+ private void expectApplyTransformationChain(boolean anyTimes) {
+ final Capture<SourceRecord> recordCapture = EasyMock.newCapture();
+ IExpectationSetters<SourceRecord> convertKeyExpect = EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)));
+ if (anyTimes)
+ convertKeyExpect.andStubAnswer(recordCapture::getValue);
+ else
+ convertKeyExpect.andAnswer(recordCapture::getValue);
+ }
+
+ private RecordHeaders emptyHeaders() {
+ return new RecordHeaders();
+ }
+
+ private void createWorkerTask() {
+ createWorkerTask(keyConverter, valueConverter, headerConverter);
+ }
+
+ private void createWorkerTask(Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
+ workerTask = new AbstractWorkerSourceTask(
+ taskId, sourceTask, statusListener, TargetState.STARTED, keyConverter, valueConverter, headerConverter, transformationChain,
+ sourceTaskContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,
+ config, metrics, plugins.delegatingLoader(), Time.SYSTEM, RetryWithToleranceOperatorTest.NOOP_OPERATOR,
+ statusBackingStore, Runnable::run) {
+ @Override
+ protected void prepareToInitializeTask() {
+ }
+
+ @Override
+ protected void prepareToEnterSendLoop() {
+ }
+
+ @Override
+ protected void beginSendIteration() {
+ }
+
+ @Override
+ protected void prepareToPollTask() {
+ }
+
+ @Override
+ protected void recordDropped(SourceRecord record) {
+ }
+
+ @Override
+ protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord) {
+ return Optional.empty();
+ }
+
+ @Override
+ protected void recordDispatched(SourceRecord record) {
+ }
+
+ @Override
+ protected void batchDispatched() {
+ }
+
+ @Override
+ protected void recordSent(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord, RecordMetadata recordMetadata) {
+ }
+
+ @Override
+ protected void producerSendFailed(boolean synchronous, ProducerRecord<byte[], byte[]> producerRecord, SourceRecord preTransformRecord, Exception e) {
+ }
+
+ @Override
+ protected void finalOffsetCommit(boolean failed) {
+ }
+ };
+
+ }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index e222894945..be26149d27 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
@@ -160,6 +161,8 @@ public class ErrorHandlingTaskTest {
OffsetStorageReaderImpl offsetReader;
@Mock
OffsetStorageWriter offsetWriter;
+ @Mock
+ private ConnectorOffsetBackingStore offsetStore;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
@SuppressWarnings("unused")
@@ -530,6 +533,12 @@ public class ErrorHandlingTaskTest {
admin.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
+
+ offsetReader.close();
+ EasyMock.expectLastCall();
+
+ offsetStore.stop();
+ EasyMock.expectLastCall();
}
private void expectTopicCreation(String topic) {
@@ -590,7 +599,7 @@ public class ErrorHandlingTaskTest {
WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"},
taskId, sourceTask, statusListener, initialState, converter, converter, headerConverter, sourceTransforms,
producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
- offsetReader, offsetWriter, workerConfig,
+ offsetReader, offsetWriter, offsetStore, workerConfig,
ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator,
statusBackingStore, (Executor) Runnable::run);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
new file mode 100644
index 0000000000..5cf34cb911
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.integration.MonitorableSourceConnector;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.source.TransactionContext;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ClusterConfigState;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ParameterizedTest;
+import org.apache.kafka.connect.util.ThreadedTest;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TRANSACTION_BOUNDARY_INTERVAL_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@PowerMockIgnore({"javax.management.*",
+ "org.apache.log4j.*"})
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(ParameterizedTest.class)
+public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest {
+ private static final String TOPIC = "topic";
+ private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
+ private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
+
+ // Connect-format data
+ private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+ private static final Integer KEY = -1;
+ private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
+ private static final Long RECORD = 12L;
+ // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version
+ // is used in the right place.
+ private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
+ private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
+
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ private WorkerConfig config;
+ private SourceConnectorConfig sourceConfig;
+ private Plugins plugins;
+ private MockConnectMetrics metrics;
+ private Time time;
+ private CountDownLatch pollLatch;
+ @Mock private SourceTask sourceTask;
+ @Mock private Converter keyConverter;
+ @Mock private Converter valueConverter;
+ @Mock private HeaderConverter headerConverter;
+ @Mock private TransformationChain<SourceRecord> transformationChain;
+ @Mock private KafkaProducer<byte[], byte[]> producer;
+ @Mock private TopicAdmin admin;
+ @Mock private CloseableOffsetStorageReader offsetReader;
+ @Mock private OffsetStorageWriter offsetWriter;
+ @Mock private ClusterConfigState clusterConfigState;
+ private ExactlyOnceWorkerSourceTask workerTask;
+ @Mock private Future<RecordMetadata> sendFuture;
+ @MockStrict private TaskStatus.Listener statusListener;
+ @Mock private StatusBackingStore statusBackingStore;
+ @Mock private ConnectorOffsetBackingStore offsetStore;
+ @Mock private Runnable preProducerCheck;
+ @Mock private Runnable postProducerCheck;
+
+ private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
+
+ private static final Map<String, String> TASK_PROPS = new HashMap<>();
+ static {
+ TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+ }
+ private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
+ private static final SourceRecord SOURCE_RECORD =
+ new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ private static final List<SourceRecord> RECORDS = Collections.singletonList(SOURCE_RECORD);
+
+ private final boolean enableTopicCreation;
+
+ @ParameterizedTest.Parameters
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(false, true);
+ }
+
+ public ExactlyOnceWorkerSourceTaskTest(boolean enableTopicCreation) {
+ this.enableTopicCreation = enableTopicCreation;
+ }
+
+ @Override
+ public void setup() {
+ super.setup();
+ Map<String, String> workerProps = workerProps();
+ plugins = new Plugins(workerProps);
+ config = new StandaloneConfig(workerProps);
+ sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorProps(), true);
+ producerCallbacks = EasyMock.newCapture();
+ metrics = new MockConnectMetrics();
+ time = Time.SYSTEM;
+ EasyMock.expect(offsetStore.primaryOffsetsTopic()).andStubReturn("offsets-topic");
+ pollLatch = new CountDownLatch(1);
+ }
+
+ private Map<String, String> workerProps() {
+ Map<String, String> props = new HashMap<>();
+ props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ props.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ props.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ props.put("internal.key.converter.schemas.enable", "false");
+ props.put("internal.value.converter.schemas.enable", "false");
+ props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ props.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
+ return props;
+ }
+
+ private Map<String, String> sourceConnectorProps() {
+ return sourceConnectorProps(SourceTask.TransactionBoundary.DEFAULT);
+ }
+
+ private Map<String, String> sourceConnectorProps(SourceTask.TransactionBoundary transactionBoundary) {
+ // setup up props for the source connector
+ Map<String, String> props = new HashMap<>();
+ props.put("name", "foo-connector");
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+ props.put(TASKS_MAX_CONFIG, String.valueOf(1));
+ props.put(TOPIC_CONFIG, TOPIC);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", "bar"));
+ props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
+ props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
+ props.put(TRANSACTION_BOUNDARY_CONFIG, transactionBoundary.toString());
+ props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + INCLUDE_REGEX_CONFIG, TOPIC);
+ props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + INCLUDE_REGEX_CONFIG, ".*");
+ props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + EXCLUDE_REGEX_CONFIG, TOPIC);
+ return props;
+ }
+
+ @After
+ public void tearDown() {
+ if (metrics != null) metrics.stop();
+ }
+
+ private void createWorkerTask() {
+ createWorkerTask(TargetState.STARTED);
+ }
+
+ private void createWorkerTask(TargetState initialState) {
+ createWorkerTask(initialState, keyConverter, valueConverter, headerConverter);
+ }
+
+ private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
+ workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
+ transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,
+ config, clusterConfigState, metrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore,
+ sourceConfig, Runnable::run, preProducerCheck, postProducerCheck);
+ }
+
+ @Test
+ public void testStartPaused() throws Exception {
+ final CountDownLatch pauseLatch = new CountDownLatch(1);
+
+ createWorkerTask(TargetState.PAUSED);
+
+ expectCall(() -> statusListener.onPause(taskId)).andAnswer(() -> {
+ pauseLatch.countDown();
+ return null;
+ });
+
+ // The task checks to see if there are offsets to commit before pausing
+ EasyMock.expect(offsetWriter.willFlush()).andReturn(false);
+
+ expectClose();
+
+ expectCall(() -> statusListener.onShutdown(taskId));
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPause() throws Exception {
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ AtomicInteger polls = new AtomicInteger(0);
+ AtomicInteger flushes = new AtomicInteger(0);
+ pollLatch = new CountDownLatch(10);
+ expectPolls(polls);
+ expectAnyFlushes(flushes);
+
+ expectTopicCreation(TOPIC);
+
+ expectCall(() -> statusListener.onPause(taskId));
+
+ expectCall(sourceTask::stop);
+ expectCall(() -> statusListener.onShutdown(taskId));
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+ assertTrue(awaitLatch(pollLatch));
+
+ workerTask.transitionTo(TargetState.PAUSED);
+
+ int priorCount = polls.get();
+ Thread.sleep(100);
+
+ // since the transition is observed asynchronously, the count could be off by one loop iteration
+ assertTrue(polls.get() - priorCount <= 1);
+
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+
+ assertEquals("Task should have flushed offsets for every record poll, once on pause, and once for end-of-life offset commit",
+ flushes.get(), polls.get() + 2);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailureInPreProducerCheck() {
+ createWorkerTask();
+
+ Exception exception = new ConnectException("Failed to perform zombie fencing");
+ expectCall(preProducerCheck::run).andThrow(exception);
+
+ expectCall(() -> statusListener.onFailure(taskId, exception));
+
+ // Don't expect task to be stopped since it was never started to begin with
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ // No need to execute on a separate thread; preflight checks should all take place before the poll-send loop starts
+ workerTask.run();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailureInOffsetStoreStart() {
+ createWorkerTask();
+
+ expectCall(preProducerCheck::run);
+ Exception exception = new ConnectException("No soup for you!");
+ expectCall(offsetStore::start).andThrow(exception);
+
+ expectCall(() -> statusListener.onFailure(taskId, exception));
+
+ // Don't expect task to be stopped since it was never started to begin with
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ // No need to execute on a separate thread; preflight checks should all take place before the poll-send loop starts
+ workerTask.run();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailureInProducerInitialization() {
+ createWorkerTask();
+
+ expectCall(preProducerCheck::run);
+ expectCall(offsetStore::start);
+ expectCall(producer::initTransactions);
+ Exception exception = new ConnectException("You can't do that!");
+ expectCall(postProducerCheck::run).andThrow(exception);
+
+ expectCall(() -> statusListener.onFailure(taskId, exception));
+
+ // Don't expect task to be stopped since it was never started to begin with
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ // No need to execute on a separate thread; preflight checks should all take place before the poll-send loop starts
+ workerTask.run();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailureInPostProducerCheck() {
+ createWorkerTask();
+
+ expectCall(preProducerCheck::run);
+ expectCall(offsetStore::start);
+ Exception exception = new ConnectException("New task configs for the connector have already been generated");
+ expectCall(producer::initTransactions).andThrow(exception);
+
+ expectCall(() -> statusListener.onFailure(taskId, exception));
+
+ // Don't expect task to be stopped since it was never started to begin with
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ // No need to execute on a separate thread; preflight checks should all take place before the poll-send loop starts
+ workerTask.run();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPollsInBackground() throws Exception {
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ AtomicInteger polls = new AtomicInteger(0);
+ AtomicInteger flushes = new AtomicInteger(0);
+ pollLatch = new CountDownLatch(10);
+ expectPolls(polls);
+ expectAnyFlushes(flushes);
+
+ expectTopicCreation(TOPIC);
+
+ expectCall(sourceTask::stop);
+ expectCall(() -> statusListener.onShutdown(taskId));
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+ assertPollMetrics(10);
+ assertTransactionMetrics(1);
+
+ assertEquals("Task should have flushed offsets for every record poll and for end-of-life offset commit",
+ flushes.get(), polls.get() + 1);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailureInPoll() throws Exception {
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ final CountDownLatch pollLatch = new CountDownLatch(1);
+ final RuntimeException exception = new RuntimeException();
+ EasyMock.expect(sourceTask.poll()).andAnswer(() -> {
+ pollLatch.countDown();
+ throw exception;
+ });
+
+ expectCall(() -> statusListener.onFailure(taskId, exception));
+ expectCall(sourceTask::stop);
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
+ //Failure in poll should trigger automatic stop of the worker
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+ assertPollMetrics(0);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailureInPollAfterCancel() throws Exception {
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ final CountDownLatch pollLatch = new CountDownLatch(1);
+ final CountDownLatch workerCancelLatch = new CountDownLatch(1);
+ final RuntimeException exception = new RuntimeException();
+ EasyMock.expect(sourceTask.poll()).andAnswer(() -> {
+ pollLatch.countDown();
+ assertTrue(awaitLatch(workerCancelLatch));
+ throw exception;
+ });
+
+ expectCall(offsetReader::close);
+ expectCall(() -> producer.close(Duration.ZERO));
+ expectCall(sourceTask::stop);
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
+ workerTask.cancel();
+ workerCancelLatch.countDown();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+ assertPollMetrics(0);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailureInPollAfterStop() throws Exception {
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ final CountDownLatch pollLatch = new CountDownLatch(1);
+ final CountDownLatch workerStopLatch = new CountDownLatch(1);
+ final RuntimeException exception = new RuntimeException();
+ EasyMock.expect(sourceTask.poll()).andAnswer(() -> {
+ pollLatch.countDown();
+ assertTrue(awaitLatch(workerStopLatch));
+ throw exception;
+ });
+
+ expectCall(() -> statusListener.onShutdown(taskId));
+ expectCall(sourceTask::stop);
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
+ workerTask.stop();
+ workerStopLatch.countDown();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+ assertPollMetrics(0);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPollReturnsNoRecords() throws Exception {
+ // Test that the task handles an empty list of records
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ final CountDownLatch pollLatch = expectEmptyPolls(1, new AtomicInteger());
+ EasyMock.expect(offsetWriter.willFlush()).andReturn(false).anyTimes();
+
+ expectCall(sourceTask::stop);
+ expectCall(() -> statusListener.onShutdown(taskId));
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+ assertPollMetrics(0);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPollBasedCommit() throws Exception {
+ Map<String, String> connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.POLL);
+ sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation);
+
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ AtomicInteger polls = new AtomicInteger();
+ AtomicInteger flushes = new AtomicInteger();
+ expectPolls(polls);
+ expectAnyFlushes(flushes);
+
+ expectTopicCreation(TOPIC);
+
+ expectCall(sourceTask::stop);
+ expectCall(() -> statusListener.onShutdown(taskId));
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+
+ assertEquals("Task should have flushed offsets for every record poll, and for end-of-life offset commit",
+ flushes.get(), polls.get() + 1);
+
+ assertPollMetrics(1);
+ assertTransactionMetrics(1);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testIntervalBasedCommit() throws Exception {
+ long commitInterval = 618;
+ Map<String, String> connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.INTERVAL);
+ connectorProps.put(TRANSACTION_BOUNDARY_INTERVAL_CONFIG, Long.toString(commitInterval));
+ sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation);
+
+ time = new MockTime();
+
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ expectPolls();
+ final CountDownLatch firstPollLatch = new CountDownLatch(2);
+ final CountDownLatch secondPollLatch = new CountDownLatch(2);
+ final CountDownLatch thirdPollLatch = new CountDownLatch(2);
+
+ AtomicInteger flushes = new AtomicInteger();
+ expectFlush(FlushOutcome.SUCCEED, flushes);
+ expectFlush(FlushOutcome.SUCCEED, flushes);
+ expectFlush(FlushOutcome.SUCCEED, flushes);
+
+ expectTopicCreation(TOPIC);
+
+ expectCall(sourceTask::stop);
+ expectCall(() -> statusListener.onShutdown(taskId));
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ pollLatch = firstPollLatch;
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("No flushes should have taken place before offset commit interval has elapsed", 0, flushes.get());
+ time.sleep(commitInterval);
+
+ pollLatch = secondPollLatch;
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("One flush should have taken place after offset commit interval has elapsed", 1, flushes.get());
+ time.sleep(commitInterval * 2);
+
+ pollLatch = thirdPollLatch;
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("Two flushes should have taken place after offset commit interval has elapsed again", 2, flushes.get());
+
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+
+ assertEquals("Task should have flushed offsets twice based on offset commit interval, and performed final end-of-life offset commit",
+ 3, flushes.get());
+
+ assertPollMetrics(2);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testConnectorBasedCommit() throws Exception {
+ Map<String, String> connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR);
+ sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation);
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ expectPolls();
+ List<CountDownLatch> pollLatches = IntStream.range(0, 7).mapToObj(i -> new CountDownLatch(3)).collect(Collectors.toList());
+
+ AtomicInteger flushes = new AtomicInteger();
+ // First flush: triggered by TransactionContext::commitTransaction (batch)
+ expectFlush(FlushOutcome.SUCCEED, flushes);
+
+ // Second flush: triggered by TransactionContext::commitTransaction (record)
+ expectFlush(FlushOutcome.SUCCEED, flushes);
+
+ // Third flush: triggered by TransactionContext::abortTransaction (batch)
+ expectCall(producer::abortTransaction);
+ EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
+ expectFlush(FlushOutcome.SUCCEED, flushes);
+
+ // Third flush: triggered by TransactionContext::abortTransaction (record)
+ EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
+ expectCall(producer::abortTransaction);
+ expectFlush(FlushOutcome.SUCCEED, flushes);
+
+ expectTopicCreation(TOPIC);
+
+ expectCall(sourceTask::stop);
+ expectCall(() -> statusListener.onShutdown(taskId));
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ TransactionContext transactionContext = workerTask.sourceTaskContext.transactionContext();
+
+ int poll = -1;
+ pollLatch = pollLatches.get(++poll);
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("No flushes should have taken place without connector requesting transaction commit", 0, flushes.get());
+
+ transactionContext.commitTransaction();
+ pollLatch = pollLatches.get(++poll);
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("One flush should have taken place after connector requested batch commit", 1, flushes.get());
+
+ transactionContext.commitTransaction(SOURCE_RECORD);
+ pollLatch = pollLatches.get(++poll);
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("Two flushes should have taken place after connector requested individual record commit", 2, flushes.get());
+
+ pollLatch = pollLatches.get(++poll);
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("Only two flushes should still have taken place without connector re-requesting commit, even on identical records", 2, flushes.get());
+
+ transactionContext.abortTransaction();
+ pollLatch = pollLatches.get(++poll);
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("Three flushes should have taken place after connector requested batch abort", 3, flushes.get());
+
+ transactionContext.abortTransaction(SOURCE_RECORD);
+ pollLatch = pollLatches.get(++poll);
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("Four flushes should have taken place after connector requested individual record abort", 4, flushes.get());
+
+ pollLatch = pollLatches.get(++poll);
+ assertTrue(awaitLatch(pollLatch));
+ assertEquals("Only four flushes should still have taken place without connector re-requesting abort, even on identical records", 4, flushes.get());
+
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+
+ assertEquals("Task should have flushed offsets four times based on connector-defined boundaries, and skipped final end-of-life offset commit",
+ 4, flushes.get());
+
+ assertPollMetrics(1);
+ assertTransactionMetrics(2);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommitFlushCallbackFailure() throws Exception {
+ testCommitFailure(FlushOutcome.FAIL_FLUSH_CALLBACK);
+ }
+
+ @Test
+ public void testCommitTransactionFailure() throws Exception {
+ testCommitFailure(FlushOutcome.FAIL_TRANSACTION_COMMIT);
+ }
+
+ private void testCommitFailure(FlushOutcome causeOfFailure) throws Exception {
+ createWorkerTask();
+
+ expectPreflight();
+ expectStartup();
+
+ expectPolls();
+ expectFlush(causeOfFailure);
+
+ expectTopicCreation(TOPIC);
+
+ expectCall(sourceTask::stop);
+ // Unlike the standard WorkerSourceTask class, this one fails permanently when offset commits don't succeed
+ final CountDownLatch taskFailure = new CountDownLatch(1);
+ expectCall(() -> statusListener.onFailure(EasyMock.eq(taskId), EasyMock.anyObject()))
+ .andAnswer(() -> {
+ taskFailure.countDown();
+ return null;
+ });
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(taskFailure));
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+ assertPollMetrics(1);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSendRecordsRetries() throws Exception {
+ createWorkerTask();
+
+ // Differentiate only by Kafka partition so we can reuse conversion expectations
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectTopicCreation(TOPIC);
+
+ // First round
+ expectSendRecordOnce(false);
+ expectCall(producer::beginTransaction);
+ // Any Producer retriable exception should work here
+ expectSendRecordSyncFailure(new org.apache.kafka.common.errors.TimeoutException("retriable sync failure"));
+
+ // Second round
+ expectSendRecordOnce(true);
+ expectSendRecordOnce(false);
+
+ PowerMock.replayAll();
+
+ // Try to send 3, make first pass, second fail. Should save last two
+ workerTask.toSend = Arrays.asList(record1, record2, record3);
+ workerTask.sendRecords();
+ assertEquals(Arrays.asList(record2, record3), workerTask.toSend);
+
+ // Next they all succeed
+ workerTask.sendRecords();
+ assertNull(workerTask.toSend);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSendRecordsProducerSendFailsImmediately() {
+ if (!enableTopicCreation)
+ // should only test with topic creation enabled
+ return;
+
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectCall(producer::beginTransaction);
+ expectTopicCreation(TOPIC);
+ expectConvertHeadersAndKeyValue(TOPIC, true, emptyHeaders());
+ expectApplyTransformationChain(false);
+
+ EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject()))
+ .andThrow(new KafkaException("Producer closed while send in progress", new InvalidTopicException(TOPIC)));
+
+ PowerMock.replayAll();
+
+ workerTask.toSend = Arrays.asList(record1, record2);
+ assertThrows(ConnectException.class, workerTask::sendRecords);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSlowTaskStart() throws Exception {
+ final CountDownLatch startupLatch = new CountDownLatch(1);
+ final CountDownLatch finishStartupLatch = new CountDownLatch(1);
+
+ createWorkerTask();
+
+ expectPreflight();
+
+ expectCall(() -> sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)));
+ expectCall(() -> sourceTask.start(TASK_PROPS));
+ EasyMock.expectLastCall().andAnswer(() -> {
+ startupLatch.countDown();
+ assertTrue(awaitLatch(finishStartupLatch));
+ return null;
+ });
+
+ expectCall(() -> statusListener.onStartup(taskId));
+
+ expectCall(sourceTask::stop);
+ EasyMock.expect(offsetWriter.willFlush()).andReturn(false);
+
+ expectCall(() -> statusListener.onShutdown(taskId));
+
+ expectClose();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> workerTaskFuture = executor.submit(workerTask);
+
+ // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
+ // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
+ // cannot be invoked immediately in the thread trying to stop the task.
+ assertTrue(awaitLatch(startupLatch));
+ workerTask.stop();
+ finishStartupLatch.countDown();
+ assertTrue(workerTask.awaitStop(1000));
+
+ workerTaskFuture.get();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCancel() {
+ createWorkerTask();
+
+ expectCall(offsetReader::close);
+ expectCall(() -> producer.close(Duration.ZERO));
+
+ PowerMock.replayAll();
+
+ // workerTask said something dumb on twitter
+ workerTask.cancel();
+
+ PowerMock.verifyAll();
+ }
+
+ private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
+ Set<String> created = Collections.singleton(topic);
+ Set<String> existing = Collections.emptySet();
+ return new TopicAdmin.TopicCreationResponse(created, existing);
+ }
+
+ private CountDownLatch expectEmptyPolls(int minimum, final AtomicInteger count) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(minimum);
+ // Note that we stub these to allow any number of calls because the thread will continue to
+ // run. The count passed in + latch returned just makes sure we get *at least* that number of
+ // calls
+ EasyMock.expect(sourceTask.poll())
+ .andStubAnswer(() -> {
+ count.incrementAndGet();
+ latch.countDown();
+ Thread.sleep(10);
+ return Collections.emptyList();
+ });
+ return latch;
+ }
+
+ private void expectPolls(final AtomicInteger pollCount) throws Exception {
+ expectCall(producer::beginTransaction).atLeastOnce();
+ // Note that we stub these to allow any number of calls because the thread will continue to
+ // run. The count passed in + latch returned just makes sure we get *at least* that number of
+ // calls
+ EasyMock.expect(sourceTask.poll())
+ .andStubAnswer(() -> {
+ pollCount.incrementAndGet();
+ pollLatch.countDown();
+ Thread.sleep(10);
+ return RECORDS;
+ });
+ // Fallout of the poll() call
+ expectSendRecordAnyTimes();
+ }
+
+ private void expectPolls() throws Exception {
+ expectPolls(new AtomicInteger());
+ }
+
+ @SuppressWarnings("unchecked")
+ private void expectSendRecordSyncFailure(Throwable error) {
+ expectConvertHeadersAndKeyValue(false);
+ expectApplyTransformationChain(false);
+
+ offsetWriter.offset(PARTITION, OFFSET);
+ PowerMock.expectLastCall();
+
+ EasyMock.expect(
+ producer.send(EasyMock.anyObject(ProducerRecord.class),
+ EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class)))
+ .andThrow(error);
+ }
+
+ private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
+ return expectSendRecordSendSuccess(true, false);
+ }
+
+ private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce(boolean isRetry) {
+ return expectSendRecordSendSuccess(false, isRetry);
+ }
+
+ private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordSendSuccess(boolean anyTimes, boolean isRetry) {
+ return expectSendRecord(TOPIC, anyTimes, isRetry, true, true, emptyHeaders());
+ }
+
+ private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
+ String topic,
+ boolean anyTimes,
+ boolean isRetry,
+ boolean sendSuccess,
+ boolean isMockedConverters,
+ Headers headers
+ ) {
+ if (isMockedConverters) {
+ expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+ }
+
+ expectApplyTransformationChain(anyTimes);
+
+ Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
+
+ // 1. Offset data is passed to the offset storage.
+ if (!isRetry) {
+ offsetWriter.offset(PARTITION, OFFSET);
+ if (anyTimes)
+ PowerMock.expectLastCall().anyTimes();
+ else
+ PowerMock.expectLastCall();
+ }
+
+ // 2. Converted data passed to the producer, which will need callbacks invoked for flush to work
+ IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
+ producer.send(EasyMock.capture(sent),
+ EasyMock.capture(producerCallbacks)));
+ IAnswer<Future<RecordMetadata>> expectResponse = () -> {
+ synchronized (producerCallbacks) {
+ for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
+ if (sendSuccess) {
+ cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
+ 0L, 0, 0), null);
+ } else {
+ cb.onCompletion(null, new TopicAuthorizationException("foo"));
+ }
+ }
+ producerCallbacks.reset();
+ }
+ return sendFuture;
+ };
+ if (anyTimes)
+ expect.andStubAnswer(expectResponse);
+ else
+ expect.andAnswer(expectResponse);
+
+ if (sendSuccess) {
+ // 3. As a result of a successful producer send callback, we note the use of the topic
+ expectTaskGetTopic(anyTimes);
+ }
+
+ return sent;
+ }
+
+ private void expectConvertHeadersAndKeyValue(boolean anyTimes) {
+ expectConvertHeadersAndKeyValue(TOPIC, anyTimes, emptyHeaders());
+ }
+
+ private void expectConvertHeadersAndKeyValue(String topic, boolean anyTimes, Headers headers) {
+ for (Header header : headers) {
+ IExpectationSetters<byte[]> convertHeaderExpect = EasyMock.expect(headerConverter.fromConnectHeader(topic, header.key(), Schema.STRING_SCHEMA, new String(header.value())));
+ if (anyTimes)
+ convertHeaderExpect.andStubReturn(header.value());
+ else
+ convertHeaderExpect.andReturn(header.value());
+ }
+ IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY));
+ if (anyTimes)
+ convertKeyExpect.andStubReturn(SERIALIZED_KEY);
+ else
+ convertKeyExpect.andReturn(SERIALIZED_KEY);
+ IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD));
+ if (anyTimes)
+ convertValueExpect.andStubReturn(SERIALIZED_RECORD);
+ else
+ convertValueExpect.andReturn(SERIALIZED_RECORD);
+ }
+
+ private void expectApplyTransformationChain(boolean anyTimes) {
+ final Capture<SourceRecord> recordCapture = EasyMock.newCapture();
+ IExpectationSetters<SourceRecord> convertKeyExpect = EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)));
+ if (anyTimes)
+ convertKeyExpect.andStubAnswer(recordCapture::getValue);
+ else
+ convertKeyExpect.andAnswer(recordCapture::getValue);
+ }
+
+ private void expectTaskGetTopic(boolean anyTimes) {
+ final Capture<String> connectorCapture = EasyMock.newCapture();
+ final Capture<String> topicCapture = EasyMock.newCapture();
+ IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
+ EasyMock.capture(connectorCapture),
+ EasyMock.capture(topicCapture)));
+ if (anyTimes) {
+ expect.andStubAnswer(() -> new TopicStatus(
+ topicCapture.getValue(),
+ new ConnectorTaskId(connectorCapture.getValue(), 0),
+ time.milliseconds()));
+ } else {
+ expect.andAnswer(() -> new TopicStatus(
+ topicCapture.getValue(),
+ new ConnectorTaskId(connectorCapture.getValue(), 0),
+ time.milliseconds()));
+ }
+ if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
+ assertEquals("job", connectorCapture.getValue());
+ assertEquals(TOPIC, topicCapture.getValue());
+ }
+ }
+
+ private boolean awaitLatch(CountDownLatch latch) {
+ try {
+ return latch.await(5000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ return false;
+ }
+
+ private enum FlushOutcome {
+ SUCCEED,
+ SUCCEED_ANY_TIMES,
+ FAIL_FLUSH_CALLBACK,
+ FAIL_TRANSACTION_COMMIT
+ }
+
+ private CountDownLatch expectFlush(FlushOutcome outcome, AtomicInteger flushCount) {
+ CountDownLatch result = new CountDownLatch(1);
+ org.easymock.IExpectationSetters<Boolean> flushBegin = EasyMock
+ .expect(offsetWriter.beginFlush())
+ .andAnswer(() -> {
+ flushCount.incrementAndGet();
+ result.countDown();
+ return true;
+ });
+ if (FlushOutcome.SUCCEED_ANY_TIMES.equals(outcome)) {
+ flushBegin.anyTimes();
+ }
+
+ Capture<Callback<Void>> flushCallback = EasyMock.newCapture();
+ org.easymock.IExpectationSetters<Future<Void>> offsetFlush =
+ EasyMock.expect(offsetWriter.doFlush(EasyMock.capture(flushCallback)));
+ switch (outcome) {
+ case SUCCEED:
+ // The worker task doesn't actually use the returned future
+ offsetFlush.andReturn(null);
+ expectCall(producer::commitTransaction);
+ expectCall(() -> sourceTask.commitRecord(EasyMock.anyObject(), EasyMock.anyObject()));
+ expectCall(sourceTask::commit);
+ break;
+ case SUCCEED_ANY_TIMES:
+ // The worker task doesn't actually use the returned future
+ offsetFlush.andReturn(null).anyTimes();
+ expectCall(producer::commitTransaction).anyTimes();
+ expectCall(() -> sourceTask.commitRecord(EasyMock.anyObject(), EasyMock.anyObject())).anyTimes();
+ expectCall(sourceTask::commit).anyTimes();
+ break;
+ case FAIL_FLUSH_CALLBACK:
+ expectCall(producer::commitTransaction);
+ offsetFlush.andAnswer(() -> {
+ flushCallback.getValue().onCompletion(new RecordTooLargeException(), null);
+ return null;
+ });
+ expectCall(offsetWriter::cancelFlush);
+ break;
+ case FAIL_TRANSACTION_COMMIT:
+ offsetFlush.andReturn(null);
+ expectCall(producer::commitTransaction)
+ .andThrow(new RecordTooLargeException());
+ expectCall(offsetWriter::cancelFlush);
+ break;
+ default:
+ fail("Unexpected flush outcome: " + outcome);
+ }
+ return result;
+ }
+
+ private CountDownLatch expectFlush(FlushOutcome outcome) {
+ return expectFlush(outcome, new AtomicInteger());
+ }
+
+ private CountDownLatch expectAnyFlushes(AtomicInteger flushCount) {
+ EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes();
+ return expectFlush(FlushOutcome.SUCCEED_ANY_TIMES, flushCount);
+ }
+
+ private void assertTransactionMetrics(int minimumMaxSizeExpected) {
+ MetricGroup transactionGroup = workerTask.transactionMetricsGroup().metricGroup();
+ double actualMin = metrics.currentMetricValueAsDouble(transactionGroup, "transaction-size-min");
+ double actualMax = metrics.currentMetricValueAsDouble(transactionGroup, "transaction-size-max");
+ double actualAvg = metrics.currentMetricValueAsDouble(transactionGroup, "transaction-size-avg");
+ assertTrue(actualMin >= 0);
+ assertTrue(actualMax >= minimumMaxSizeExpected);
+
+ if (actualMax - actualMin <= 0.000001d) {
+ assertEquals(actualMax, actualAvg, 0.000002d);
+ } else {
+ assertTrue("Average transaction size should be greater than minimum transaction size", actualAvg > actualMin);
+ assertTrue("Average transaction size should be less than maximum transaction size", actualAvg < actualMax);
+ }
+ }
+
+ private void assertPollMetrics(int minimumPollCountExpected) {
+ MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup();
+ MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
+ double pollRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-rate");
+ double pollTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-total");
+ if (minimumPollCountExpected > 0) {
+ assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-max"), 0.000001d);
+ assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-avg"), 0.000001d);
+ assertTrue(pollRate > 0.0d);
+ } else {
+ assertTrue(pollRate == 0.0d);
+ }
+ assertTrue(pollTotal >= minimumPollCountExpected);
+
+ double writeRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-rate");
+ double writeTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-total");
+ if (minimumPollCountExpected > 0) {
+ assertTrue(writeRate > 0.0d);
+ } else {
+ assertTrue(writeRate == 0.0d);
+ }
+ assertTrue(writeTotal >= minimumPollCountExpected);
+
+ double pollBatchTimeMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-max-time-ms");
+ double pollBatchTimeAvg = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-avg-time-ms");
+ if (minimumPollCountExpected > 0) {
+ assertTrue(pollBatchTimeMax >= 0.0d);
+ }
+ assertTrue(Double.isNaN(pollBatchTimeAvg) || pollBatchTimeAvg > 0.0d);
+ double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count");
+ double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
+ assertEquals(0, activeCount, 0.000001d);
+ if (minimumPollCountExpected > 0) {
+ assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+ }
+ }
+
+ private RecordHeaders emptyHeaders() {
+ return new RecordHeaders();
+ }
+
+ private abstract static class TestSourceTask extends SourceTask {
+ }
+
+ @FunctionalInterface
+ private interface MockedMethodCall {
+ void invoke() throws Exception;
+ }
+
+ private static <T> org.easymock.IExpectationSetters<T> expectCall(MockedMethodCall call) {
+ try {
+ call.invoke();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Mocked method invocation threw a checked exception", e);
+ }
+ return EasyMock.expectLastCall();
+ }
+
+ private void expectPreflight() {
+ expectCall(preProducerCheck::run);
+ expectCall(offsetStore::start);
+ expectCall(producer::initTransactions);
+ expectCall(postProducerCheck::run);
+ }
+
+ private void expectStartup() {
+ expectCall(() -> sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)));
+ expectCall(() -> sourceTask.start(TASK_PROPS));
+ expectCall(() -> statusListener.onStartup(taskId));
+ }
+
+ private void expectClose() {
+ expectCall(offsetStore::stop);
+ expectCall(() -> producer.close(EasyMock.anyObject(Duration.class)));
+ expectCall(() -> admin.close(EasyMock.anyObject(Duration.class)));
+ expectCall(transformationChain::close);
+ expectCall(offsetReader::close);
+ }
+
+ private void expectTopicCreation(String topic) {
+ if (config.topicCreationEnable()) {
+ EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
+ Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+ EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
+ }
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
index 4028249a78..39d680a7d4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
@@ -178,8 +178,8 @@ public class SubmittedRecordsTest {
assertEquals(Collections.emptyMap(), committableOffsets.offsets());
assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION1);
- assertTrue("First attempt to remove record from submitted queue should succeed", submittedRecords.removeLastOccurrence(submittedRecord));
- assertFalse("Attempt to remove already-removed record from submitted queue should fail", submittedRecords.removeLastOccurrence(submittedRecord));
+ assertTrue("First attempt to remove record from submitted queue should succeed", submittedRecord.drop());
+ assertFalse("Attempt to remove already-removed record from submitted queue should fail", submittedRecord.drop());
committableOffsets = submittedRecords.committableOffsets();
// Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
@@ -203,7 +203,7 @@ public class SubmittedRecordsTest {
assertMetadata(committableOffsets, 0, 2, 2, 1, PARTITION1, PARTITION2);
assertNoEmptyDeques();
- assertTrue("First attempt to remove record from submitted queue should succeed", submittedRecords.removeLastOccurrence(recordToRemove));
+ assertTrue("First attempt to remove record from submitted queue should succeed", recordToRemove.drop());
committableOffsets = submittedRecords.committableOffsets();
// Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets
@@ -269,20 +269,20 @@ public class SubmittedRecordsTest {
submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
);
- submittedRecords.removeLastOccurrence(recordToRemove1);
+ recordToRemove1.drop();
assertFalse(
"Await should fail since only one of the two submitted records has been removed so far",
submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
);
- submittedRecords.removeLastOccurrence(recordToRemove1);
+ recordToRemove1.drop();
assertFalse(
"Await should fail since only one of the two submitted records has been removed so far, "
+ "even though that record has been removed twice",
submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
);
- submittedRecords.removeLastOccurrence(recordToRemove2);
+ recordToRemove2.drop();
assertTrue(
"Await should succeed since both submitted records have now been removed",
submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index 29b85301e8..253efff8c0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -24,7 +24,8 @@ import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.easymock.Capture;
import org.apache.kafka.connect.util.Callback;
import org.easymock.EasyMock;
@@ -65,7 +66,8 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Mock Connector connector;
@Mock CloseableConnectorContext ctx;
@Mock ConnectorStatus.Listener listener;
- @Mock OffsetStorageReader offsetStorageReader;
+ @Mock CloseableOffsetStorageReader offsetStorageReader;
+ @Mock ConnectorOffsetBackingStore offsetStore;
@Mock ClassLoader classLoader;
@Before
@@ -99,9 +101,15 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertFailedMetric(workerConnector);
@@ -134,13 +142,19 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull());
expectLastCall();
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertFailedMetric(workerConnector);
@@ -177,13 +191,19 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall();
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSourceMetric(workerConnector);
@@ -223,6 +243,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall();
@@ -231,7 +257,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSinkMetric(workerConnector);
@@ -273,6 +299,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
expectLastCall();
@@ -281,7 +313,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSourceMetric(workerConnector);
@@ -316,13 +348,19 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
expectLastCall();
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSinkMetric(workerConnector);
@@ -358,13 +396,19 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull());
expectLastCall();
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSinkMetric(workerConnector);
@@ -407,9 +451,15 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSourceMetric(workerConnector);
@@ -447,13 +497,19 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall().times(2);
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSourceMetric(workerConnector);
@@ -495,6 +551,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
ctx.close();
expectLastCall();
+ offsetStorageReader.close();
+ expectLastCall();
+
+ offsetStore.stop();
+ expectLastCall();
+
Callback<TargetState> onStateChange = createStrictMock(Callback.class);
onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
expectLastCall();
@@ -503,7 +565,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedSourceMetric(workerConnector);
@@ -531,7 +593,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
Throwable e = exceptionCapture.getValue();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 5aeff5e9d8..322c37f05d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -16,17 +16,12 @@
*/
package org.apache.kafka.connect.runtime;
-import java.util.Collection;
import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
@@ -34,13 +29,9 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.errors.RetriableException;
-import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
-import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
@@ -50,6 +41,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
@@ -76,10 +68,9 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.powermock.reflect.Whitebox;
-import java.nio.ByteBuffer;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -106,7 +97,6 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -147,6 +137,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Mock private TopicAdmin admin;
@Mock private CloseableOffsetStorageReader offsetReader;
@Mock private OffsetStorageWriter offsetWriter;
+ @Mock private ConnectorOffsetBackingStore offsetStore;
@Mock private ClusterConfigState clusterConfigState;
private WorkerSourceTask workerTask;
@Mock private Future<RecordMetadata> sendFuture;
@@ -235,16 +226,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
createWorkerTask(initialState, keyConverter, valueConverter, headerConverter, retryWithToleranceOperator);
}
- private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter,
- HeaderConverter headerConverter) {
- createWorkerTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
- }
-
private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter,
HeaderConverter headerConverter, RetryWithToleranceOperator retryWithToleranceOperator) {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
- transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
- offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
+ transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
+ offsetReader, offsetWriter, offsetStore, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
retryWithToleranceOperator, statusBackingStore, Runnable::run);
}
@@ -644,93 +630,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
- @Test
- public void testSendRecordsConvertsData() throws Exception {
- createWorkerTask();
-
- List<SourceRecord> records = new ArrayList<>();
- // Can just use the same record for key and value
- records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
-
- Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
-
- expectTopicCreation(TOPIC);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", records);
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertEquals(SERIALIZED_KEY, sent.getValue().key());
- assertEquals(SERIALIZED_RECORD, sent.getValue().value());
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testSendRecordsPropagatesTimestamp() throws Exception {
- final Long timestamp = System.currentTimeMillis();
-
- createWorkerTask();
-
- List<SourceRecord> records = Collections.singletonList(
- new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
- );
-
- Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
-
- expectTopicCreation(TOPIC);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", records);
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertEquals(timestamp, sent.getValue().timestamp());
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testSendRecordsCorruptTimestamp() throws Exception {
- final Long timestamp = -3L;
- createWorkerTask();
-
- List<SourceRecord> records = Collections.singletonList(
- new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
- );
-
- Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", records);
- assertThrows(InvalidRecordException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords"));
- assertFalse(sent.hasCaptured());
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testSendRecordsNoTimestamp() throws Exception {
- final Long timestamp = -1L;
- createWorkerTask();
-
- List<SourceRecord> records = Collections.singletonList(
- new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
- );
-
- Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
-
- expectTopicCreation(TOPIC);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", records);
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertNull(sent.getValue().timestamp());
-
- PowerMock.verifyAll();
- }
-
@Test
public void testSendRecordsRetries() throws Exception {
createWorkerTask();
@@ -775,6 +674,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
expectTopicCreation(TOPIC);
expectSendRecordProducerCallbackFail();
+ expectApplyTransformationChain(false);
+ expectConvertHeadersAndKeyValue(false);
PowerMock.replayAll();
@@ -842,7 +743,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
expectSendRecordOnce();
expectSendRecordProducerCallbackFail();
sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull());
@@ -919,440 +819,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
- @Test
- public void testMetricsGroup() {
- SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics);
- SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, metrics);
- for (int i = 0; i != 10; ++i) {
- group.recordPoll(100, 1000 + i * 100);
- group.recordWrite(10);
- }
- for (int i = 0; i != 20; ++i) {
- group1.recordPoll(100, 1000 + i * 100);
- group1.recordWrite(10);
- }
- assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
- assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
- assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d);
- assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d);
- assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
- assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
- assertEquals(900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-active-count"), 0.001d);
-
- // Close the group
- group.close();
-
- for (MetricName metricName : group.metricGroup().metrics().metrics().keySet()) {
- // Metrics for this group should no longer exist
- assertFalse(group.metricGroup().groupId().includes(metricName));
- }
- // Sensors for this group should no longer exist
- assertNull(group.metricGroup().metrics().getSensor("sink-record-read"));
- assertNull(group.metricGroup().metrics().getSensor("sink-record-send"));
- assertNull(group.metricGroup().metrics().getSensor("sink-record-active-count"));
- assertNull(group.metricGroup().metrics().getSensor("partition-count"));
- assertNull(group.metricGroup().metrics().getSensor("offset-seq-number"));
- assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion"));
- assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
- assertNull(group.metricGroup().metrics().getSensor("put-batch-time"));
-
- assertEquals(2900.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
- assertEquals(1950.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
- assertEquals(66.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-rate"), 0.001d);
- assertEquals(2000, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-total"), 0.001d);
- assertEquals(6.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d);
- assertEquals(200, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d);
- assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d);
- }
-
- @Test
- public void testHeaders() throws Exception {
- Headers headers = new RecordHeaders();
- headers.add("header_key", "header_value".getBytes());
-
- org.apache.kafka.connect.header.Headers connectHeaders = new ConnectHeaders();
- connectHeaders.add("header_key", new SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
-
- createWorkerTask();
-
- List<SourceRecord> records = new ArrayList<>();
- records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders));
-
- expectTopicCreation(TOPIC);
-
- Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, true, true, true, true, headers);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", records);
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertEquals(SERIALIZED_KEY, sent.getValue().key());
- assertEquals(SERIALIZED_RECORD, sent.getValue().value());
- assertEquals(headers, sent.getValue().headers());
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testHeadersWithCustomConverter() throws Exception {
- StringConverter stringConverter = new StringConverter();
- SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders();
-
- createWorkerTask(TargetState.STARTED, stringConverter, testConverter, stringConverter);
-
- List<SourceRecord> records = new ArrayList<>();
-
- String stringA = "Árvíztűrő tükörfúrógép";
- org.apache.kafka.connect.header.Headers headersA = new ConnectHeaders();
- String encodingA = "latin2";
- headersA.addString("encoding", encodingA);
-
- records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, stringA, null, headersA));
-
- String stringB = "Тестовое сообщение";
- org.apache.kafka.connect.header.Headers headersB = new ConnectHeaders();
- String encodingB = "koi8_r";
- headersB.addString("encoding", encodingB);
-
- records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, stringB, null, headersB));
-
- expectTopicCreation(TOPIC);
-
- Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(TOPIC, false, true, true, false, null);
- Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(TOPIC, false, true, true, false, null);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", records);
- Whitebox.invokeMethod(workerTask, "sendRecords");
-
- assertEquals(ByteBuffer.wrap("a".getBytes()), ByteBuffer.wrap(sentRecordA.getValue().key()));
- assertEquals(
- ByteBuffer.wrap(stringA.getBytes(encodingA)),
- ByteBuffer.wrap(sentRecordA.getValue().value())
- );
- assertEquals(encodingA, new String(sentRecordA.getValue().headers().lastHeader("encoding").value()));
-
- assertEquals(ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap(sentRecordB.getValue().key()));
- assertEquals(
- ByteBuffer.wrap(stringB.getBytes(encodingB)),
- ByteBuffer.wrap(sentRecordB.getValue().value())
- );
- assertEquals(encodingB, new String(sentRecordB.getValue().headers().lastHeader("encoding").value()));
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testTopicCreateWhenTopicExists() throws Exception {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- expectPreliminaryCalls();
- TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
- TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
- EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc));
-
- expectSendRecordTaskCommitRecordSucceed(false);
- expectSendRecordTaskCommitRecordSucceed(false);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
- Whitebox.invokeMethod(workerTask, "sendRecords");
- }
-
- @Test
- public void testSendRecordsTopicDescribeRetries() throws Exception {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- expectPreliminaryCalls();
- // First round - call to describe the topic times out
- EasyMock.expect(admin.describeTopics(TOPIC))
- .andThrow(new RetriableException(new TimeoutException("timeout")));
-
- // Second round - calls to describe and create succeed
- expectTopicCreation(TOPIC);
- // Exactly two records are sent
- expectSendRecordTaskCommitRecordSucceed(false);
- expectSendRecordTaskCommitRecordSucceed(false);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend"));
-
- // Next they all succeed
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertNull(Whitebox.getInternalState(workerTask, "toSend"));
- }
-
- @Test
- public void testSendRecordsTopicCreateRetries() throws Exception {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- // First call to describe the topic times out
- expectPreliminaryCalls();
- EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
- Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
- EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
- .andThrow(new RetriableException(new TimeoutException("timeout")));
-
- // Second round
- expectTopicCreation(TOPIC);
- expectSendRecordTaskCommitRecordSucceed(false);
- expectSendRecordTaskCommitRecordSucceed(false);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend"));
-
- // Next they all succeed
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertNull(Whitebox.getInternalState(workerTask, "toSend"));
- }
-
- @Test
- public void testSendRecordsTopicDescribeRetriesMidway() throws Exception {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- // Differentiate only by Kafka partition so we can reuse conversion expectations
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- // First round
- expectPreliminaryCalls(OTHER_TOPIC);
- expectTopicCreation(TOPIC);
- expectSendRecordTaskCommitRecordSucceed(false);
- expectSendRecordTaskCommitRecordSucceed(false);
-
- // First call to describe the topic times out
- EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
- .andThrow(new RetriableException(new TimeoutException("timeout")));
-
- // Second round
- expectTopicCreation(OTHER_TOPIC);
- expectSendRecord(OTHER_TOPIC, false, true, true, true, emptyHeaders());
-
- PowerMock.replayAll();
-
- // Try to send 3, make first pass, second fail. Should save last two
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend"));
-
- // Next they all succeed
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertNull(Whitebox.getInternalState(workerTask, "toSend"));
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testSendRecordsTopicCreateRetriesMidway() throws Exception {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- // Differentiate only by Kafka partition so we can reuse conversion expectations
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- // First round
- expectPreliminaryCalls(OTHER_TOPIC);
- expectTopicCreation(TOPIC);
- expectSendRecordTaskCommitRecordSucceed(false);
- expectSendRecordTaskCommitRecordSucceed(false);
-
- EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
- // First call to create the topic times out
- Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
- EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
- .andThrow(new RetriableException(new TimeoutException("timeout")));
-
- // Second round
- expectTopicCreation(OTHER_TOPIC);
- expectSendRecord(OTHER_TOPIC, false, true, true, true, emptyHeaders());
-
- PowerMock.replayAll();
-
- // Try to send 3, make first pass, second fail. Should save last two
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend"));
-
- // Next they all succeed
- Whitebox.invokeMethod(workerTask, "sendRecords");
- assertNull(Whitebox.getInternalState(workerTask, "toSend"));
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testTopicDescribeFails() {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- expectPreliminaryCalls();
- EasyMock.expect(admin.describeTopics(TOPIC))
- .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
- assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords"));
- }
-
- @Test
- public void testTopicCreateFails() {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- expectPreliminaryCalls();
- EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
- Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
- EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
- .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
- assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords"));
- assertTrue(newTopicCapture.hasCaptured());
- }
-
- @Test
- public void testTopicCreateFailsWithExceptionWhenCreateReturnsTopicNotCreatedOrFound() {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- expectPreliminaryCalls();
- EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
- Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
- EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(TopicAdmin.EMPTY_CREATION);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
- assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords"));
- assertTrue(newTopicCapture.hasCaptured());
- }
-
- @Test
- public void testTopicCreateSucceedsWhenCreateReturnsExistingTopicFound() throws Exception {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- expectPreliminaryCalls();
- EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
- Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
- EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(foundTopic(TOPIC));
-
- expectSendRecordTaskCommitRecordSucceed(false);
- expectSendRecordTaskCommitRecordSucceed(false);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
- Whitebox.invokeMethod(workerTask, "sendRecords");
- }
-
- @Test
- public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() throws Exception {
- if (!enableTopicCreation)
- // should only test with topic creation enabled
- return;
-
- createWorkerTask();
-
- SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
- SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-
- expectPreliminaryCalls();
- EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
- Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
- EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
- expectSendRecordTaskCommitRecordSucceed(false);
- expectSendRecordTaskCommitRecordSucceed(false);
-
- PowerMock.replayAll();
-
- Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
- Whitebox.invokeMethod(workerTask, "sendRecords");
- }
-
private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
return new TopicAdmin.TopicCreationResponse(created, existing);
}
- private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
- Set<String> created = Collections.emptySet();
- Set<String> existing = Collections.singleton(topic);
- return new TopicAdmin.TopicCreationResponse(created, existing);
- }
-
private void expectPreliminaryCalls() {
expectPreliminaryCalls(TOPIC);
}
@@ -1625,6 +1097,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
transformationChain.close();
EasyMock.expectLastCall();
+
+ offsetReader.close();
+ EasyMock.expectLastCall();
+
+ offsetStore.stop();
+ EasyMock.expectLastCall();
}
private void expectTopicCreation(String topic) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index b2d7e0a500..a15cf77e32 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
import org.apache.kafka.connect.storage.ClusterConfigState;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -48,6 +49,7 @@ import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
@@ -93,10 +95,18 @@ import java.util.function.Function;
import static org.apache.kafka.clients.admin.AdminClientConfig.RETRY_BACKOFF_MS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
+import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -281,9 +291,8 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testStartAndStopConnector() throws Throwable {
-
final String connectorClass = SampleSourceConnector.class.getName();
- connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass);
+ connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
// Create
when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
@@ -363,9 +372,8 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testStartConnectorFailure() throws Exception {
-
final String nonConnectorClass = "java.util.HashMap";
- connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, nonConnectorClass); // Bad connector class name
+ connectorProps.put(CONNECTOR_CLASS_CONFIG, nonConnectorClass); // Bad connector class name
Exception exception = new ConnectException("Failed to find Connector");
@@ -413,7 +421,6 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testAddConnectorByAlias() throws Throwable {
-
final String connectorAlias = "SampleSourceConnector";
when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
@@ -427,7 +434,7 @@ public class WorkerTest extends ThreadedTest {
connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
.thenReturn("test-cluster");
- connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorAlias);
+ connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorAlias);
connectorProps.put(SinkConnectorConfig.TOPICS_CONFIG, "gfieyls, wfru");
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -471,7 +478,6 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testAddConnectorByShortAlias() throws Throwable {
-
final String shortConnectorAlias = "WorkerTest";
when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
@@ -582,7 +588,7 @@ public class WorkerTest extends ThreadedTest {
Map<String, String> expectedTaskProps = new HashMap<>();
expectedTaskProps.put("foo", "bar");
expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
- expectedTaskProps.put(SinkTask.TOPICS_CONFIG, "foo,bar");
+ expectedTaskProps.put(TOPICS_CONFIG, "foo,bar");
assertEquals(2, taskConfigs.size());
assertEquals(expectedTaskProps, taskConfigs.get(0));
assertEquals(expectedTaskProps, taskConfigs.get(1));
@@ -615,7 +621,7 @@ public class WorkerTest extends ThreadedTest {
}
@Test
- public void testAddRemoveTask() {
+ public void testAddRemoveSourceTask() {
when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
when(delegatingLoader.connectorLoader(SampleSourceConnector.class.getName())).thenReturn(pluginLoader);
@@ -639,7 +645,7 @@ public class WorkerTest extends ThreadedTest {
assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());
- worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
+ worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
assertStatistics(worker, 0, 1);
assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
@@ -664,7 +670,136 @@ public class WorkerTest extends ThreadedTest {
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+ }
+
+ @Test
+ public void testAddRemoveSinkTask() {
+ // Most of the other cases use source tasks; we make sure to get code coverage for sink tasks here as well
+ when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
+ when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
+ when(delegatingLoader.connectorLoader(SampleSinkConnector.class.getName())).thenReturn(pluginLoader);
+
+ SinkTask task = mock(TestSinkTask.class);
+ when(plugins.newTask(TestSinkTask.class)).thenReturn(task);
+ when(task.version()).thenReturn("1.0");
+ mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
+ mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
+ mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
+ when(executorService.submit(any(WorkerSinkTask.class))).thenReturn(null);
+ doReturn(SampleSinkConnector.class).when(plugins).connectorClass(SampleSinkConnector.class.getName());
+ pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
+ pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
+
+ Map<String, String> origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
+
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
+ noneConnectorClientConfigOverridePolicy);
+ worker.herder = herder;
+ worker.start();
+
+ assertStatistics(worker, 0, 0);
+ assertEquals(Collections.emptySet(), worker.taskIds());
+ Map<String, String> connectorConfigs = anyConnectorConfigMap();
+ connectorConfigs.put(TOPICS_CONFIG, "t1");
+ connectorConfigs.put(CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
+
+ worker.startSinkTask(TASK_ID, ClusterConfigState.EMPTY, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED);
+ assertStatistics(worker, 0, 1);
+ assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
+ worker.stopAndAwaitTask(TASK_ID);
+ assertStatistics(worker, 0, 0);
+ assertEquals(Collections.emptySet(), worker.taskIds());
+ // Nothing should be left, so this should effectively be a nop
+ worker.stop();
+ assertStatistics(worker, 0, 0);
+
+ verify(plugins, times(2)).currentThreadLoader();
+ verify(plugins).newTask(TestSinkTask.class);
+ verify(task).version();
+ verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
+ verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
+ verifyTaskHeaderConverter();
+
+ verify(executorService).submit(any(WorkerSinkTask.class));
+ verify(plugins).delegatingLoader();
+ verify(delegatingLoader).connectorLoader(SampleSinkConnector.class.getName());
+ verify(plugins).connectorClass(SampleSinkConnector.class.getName());
+
+ pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2));
+ pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
+ connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+ }
+
+ @Test
+ public void testAddRemoveExactlyOnceSourceTask() {
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+ workerProps.put("config.providers", "file");
+ workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ mockFileProviderTestId = UUID.randomUUID().toString();
+ workerProps.put("config.providers.file.param.testId", mockFileProviderTestId);
+ workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
+ workerProps.put(GROUP_ID_CONFIG, "connect-cluster");
+ workerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:2606");
+ workerProps.put(OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+ workerProps.put(CONFIG_TOPIC_CONFIG, "connect-configs");
+ workerProps.put(STATUS_STORAGE_TOPIC_CONFIG, "connect-statuses");
+ workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ config = new DistributedConfig(workerProps);
+
+ when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
+ when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
+ when(delegatingLoader.connectorLoader(SampleSourceConnector.class.getName())).thenReturn(pluginLoader);
+
+ when(plugins.newTask(TestSourceTask.class)).thenReturn(task);
+ when(task.version()).thenReturn("1.0");
+ mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
+ mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
+ mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
+ when(executorService.submit(any(ExactlyOnceWorkerSourceTask.class))).thenReturn(null);
+ doReturn(SampleSourceConnector.class).when(plugins).connectorClass(SampleSourceConnector.class.getName());
+ pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
+ pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
+
+ Runnable preProducer = mock(Runnable.class);
+ Runnable postProducer = mock(Runnable.class);
+
+ Map<String, String> origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
+ noneConnectorClientConfigOverridePolicy);
+ worker.herder = herder;
+ worker.start();
+ assertStatistics(worker, 0, 0);
+ assertEquals(Collections.emptySet(), worker.taskIds());
+ worker.startExactlyOnceSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED, preProducer, postProducer);
+ assertStatistics(worker, 0, 1);
+ assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
+ worker.stopAndAwaitTask(TASK_ID);
+ assertStatistics(worker, 0, 0);
+ assertEquals(Collections.emptySet(), worker.taskIds());
+ // Nothing should be left, so this should effectively be a nop
+ worker.stop();
+ assertStatistics(worker, 0, 0);
+
+ verify(plugins, times(2)).currentThreadLoader();
+ verify(plugins).newTask(TestSourceTask.class);
+ verify(task).version();
+ verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
+ verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
+ verifyTaskHeaderConverter();
+
+ verify(executorService).submit(any(ExactlyOnceWorkerSourceTask.class));
+ verify(plugins).delegatingLoader();
+ verify(delegatingLoader).connectorLoader(SampleSourceConnector.class.getName());
+ verify(plugins).connectorClass(SampleSourceConnector.class.getName());
+
+ pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2));
+ pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
+ connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@@ -674,7 +809,6 @@ public class WorkerTest extends ThreadedTest {
mockFileConfigProvider();
-
when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
Map<String, String> origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
@@ -701,7 +835,6 @@ public class WorkerTest extends ThreadedTest {
doReturn(SampleSourceConnector.class).when(plugins).connectorClass(SampleSourceConnector.class.getName());
-
// Each time we check the task metrics, the worker will call the herder
when(herder.taskStatus(TASK_ID)).thenReturn(
new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "msg"),
@@ -725,7 +858,7 @@ public class WorkerTest extends ThreadedTest {
assertStatistics(worker, 0, 0);
assertStartupStatistics(worker, 0, 0, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());
- worker.startTask(
+ worker.startSourceTask(
TASK_ID,
ClusterConfigState.EMPTY,
anyConnectorConfigMap(),
@@ -766,7 +899,6 @@ public class WorkerTest extends ThreadedTest {
verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
verifyTaskHeaderConverter();
verify(plugins, times(2)).currentThreadLoader();
-
}
@Test
@@ -819,7 +951,7 @@ public class WorkerTest extends ThreadedTest {
assertStatistics(worker, 0, 0);
assertStartupStatistics(worker, 0, 0, 0, 0);
- assertFalse(worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
+ assertFalse(worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
assertStartupStatistics(worker, 0, 0, 1, 1);
assertStatistics(worker, 0, 0);
@@ -870,7 +1002,7 @@ public class WorkerTest extends ThreadedTest {
worker.herder = herder;
worker.start();
assertStatistics(worker, 0, 0);
- worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
+ worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
assertStatistics(worker, 0, 1);
worker.stop();
assertStatistics(worker, 0, 0);
@@ -939,7 +1071,7 @@ public class WorkerTest extends ThreadedTest {
Map<String, String> connProps = anyConnectorConfigMap();
connProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, SampleConverterWithHeaders.class.getName());
connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, SampleConverterWithHeaders.class.getName());
- worker.startTask(TASK_ID, ClusterConfigState.EMPTY, connProps, origProps, taskStatusListener, TargetState.STARTED);
+ worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, connProps, origProps, taskStatusListener, TargetState.STARTED);
assertStatistics(worker, 0, 1);
assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
@@ -976,8 +1108,7 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("client.id", "connector-producer-job-0");
expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
assertEquals(expectedConfigs,
- Worker.producerConfigs(TASK_ID.connector(), "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
-
+ Worker.baseProducerConfigs(CONNECTOR_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX);
}
@@ -996,8 +1127,9 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(new HashMap<>());
+
assertEquals(expectedConfigs,
- Worker.producerConfigs(TASK_ID.connector(), "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
+ Worker.baseProducerConfigs(CONNECTOR_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX);
}
@@ -1023,28 +1155,27 @@ public class WorkerTest extends ThreadedTest {
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(connConfig);
assertEquals(expectedConfigs,
- Worker.producerConfigs(TASK_ID.connector(), "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
-
+ Worker.baseProducerConfigs(CONNECTOR_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX);
}
@Test
public void testConsumerConfigsWithoutOverrides() {
Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
- expectedConfigs.put("group.id", "connect-test");
- expectedConfigs.put("client.id", "connector-consumer-test-1");
+ expectedConfigs.put("group.id", "connect-test-connector");
+ expectedConfigs.put("client.id", "connector-consumer-job-0");
expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(new HashMap<>());
- assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
- verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX);
+ assertEquals(expectedConfigs, Worker.baseConsumerConfigs(CONNECTOR_ID, "connector-consumer-" + TASK_ID, config, connectorConfig,
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
}
@Test
public void testConsumerConfigsWithOverrides() {
Map<String, String> props = new HashMap<>(workerProps);
+ props.put("consumer.group.id", "connect-test");
props.put("consumer.auto.offset.reset", "latest");
props.put("consumer.max.poll.records", "1000");
props.put("consumer.client.id", "consumer-test-id");
@@ -1059,9 +1190,8 @@ public class WorkerTest extends ThreadedTest {
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(new HashMap<>());
- assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
-
+ assertEquals(expectedConfigs, Worker.baseConsumerConfigs(CONNECTOR_ID, "connector-consumer-" + TASK_ID, configWithOverrides, connectorConfig,
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX);
}
@@ -1073,11 +1203,11 @@ public class WorkerTest extends ThreadedTest {
WorkerConfig configWithOverrides = new StandaloneConfig(props);
Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
- expectedConfigs.put("group.id", "connect-test");
+ expectedConfigs.put("group.id", "connect-test-connector");
expectedConfigs.put("auto.offset.reset", "latest");
expectedConfigs.put("max.poll.records", "5000");
expectedConfigs.put("max.poll.interval.ms", "1000");
- expectedConfigs.put("client.id", "connector-consumer-test-1");
+ expectedConfigs.put("client.id", "connector-consumer-job-0");
expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
Map<String, Object> connConfig = new HashMap<>();
@@ -1086,9 +1216,8 @@ public class WorkerTest extends ThreadedTest {
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connConfig);
- assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
- null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
-
+ assertEquals(expectedConfigs, Worker.baseConsumerConfigs(CONNECTOR_ID, "connector-consumer-" + TASK_ID, configWithOverrides, connectorConfig,
+ null, allConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX);
}
@@ -1104,9 +1233,8 @@ public class WorkerTest extends ThreadedTest {
connConfig.put("max.poll.interval.ms", "1000");
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connConfig);
- assertThrows(ConnectException.class, () -> Worker.consumerConfigs(new ConnectorTaskId("test", 1),
- configWithOverrides, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
-
+ assertThrows(ConnectException.class, () -> Worker.baseConsumerConfigs(CONNECTOR_ID, "connector-consumer-" + TASK_ID,
+ configWithOverrides, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX);
}
@@ -1129,9 +1257,9 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX)).thenReturn(connConfig);
- assertEquals(expectedConfigs, Worker.adminConfigs(CONNECTOR_ID, "", configWithOverrides, connectorConfig,
- null, allConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SOURCE));
+ assertEquals(expectedConfigs, Worker.adminConfigs(CONNECTOR_ID, "", configWithOverrides, connectorConfig,
+ null, allConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX);
}
@@ -1145,9 +1273,9 @@ public class WorkerTest extends ThreadedTest {
Map<String, Object> connConfig = Collections.singletonMap("metadata.max.age.ms", "10000");
when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX)).thenReturn(connConfig);
- assertThrows(ConnectException.class, () -> Worker.adminConfigs(
- CONNECTOR_ID, "", configWithOverrides, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SOURCE));
+ assertThrows(ConnectException.class, () -> Worker.adminConfigs("test",
+ "", configWithOverrides, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK));
verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX);
}
@@ -1368,11 +1496,10 @@ public class WorkerTest extends ThreadedTest {
verify(plugins).newHeaderConverter(any(AbstractConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
}
-
private Map<String, String> anyConnectorConfigMap() {
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
- props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+ props.put(CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
@@ -1402,5 +1529,27 @@ public class WorkerTest extends ThreadedTest {
}
}
+ private static class TestSinkTask extends SinkTask {
+ public TestSinkTask() {
+ }
+
+ @Override
+ public String version() {
+ return "1.0";
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ }
+
+ @Override
+ public void put(Collection<SinkRecord> records) {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTransactionContextTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTransactionContextTest.java
new file mode 100644
index 0000000000..3bc2b2155d
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTransactionContextTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class WorkerTransactionContextTest {
+
+ private static final SourceRecord RECORD = new SourceRecord(null, null, "t", null, 0, null, null);
+
+ private WorkerTransactionContext context = new WorkerTransactionContext();
+
+ @Test
+ public void shouldNotifyOfBatchCommit() {
+ context.commitTransaction();
+ assertFalse(context.shouldAbortBatch());
+ assertFalse(context.shouldAbortOn(RECORD));
+ assertFalse(context.shouldCommitOn(RECORD));
+ assertTrue(context.shouldCommitBatch());
+ }
+
+ @Test
+ public void shouldNotifyOfRecordCommit() {
+ context.commitTransaction(RECORD);
+ assertFalse(context.shouldAbortBatch());
+ assertFalse(context.shouldAbortOn(RECORD));
+ assertFalse(context.shouldCommitBatch());
+ assertTrue(context.shouldCommitOn(RECORD));
+ }
+
+ @Test
+ public void shouldNotifyOfBatchAbort() {
+ context.abortTransaction();
+ assertFalse(context.shouldAbortOn(RECORD));
+ assertFalse(context.shouldCommitOn(RECORD));
+ assertFalse(context.shouldCommitBatch());
+ assertTrue(context.shouldAbortBatch());
+ }
+
+ @Test
+ public void shouldNotifyOfRecordAbort() {
+ context.abortTransaction(RECORD);
+ assertFalse(context.shouldAbortBatch());
+ assertFalse(context.shouldCommitOn(RECORD));
+ assertFalse(context.shouldCommitBatch());
+ assertTrue(context.shouldAbortOn(RECORD));
+ }
+
+ @Test
+ public void shouldNotCommitBatchRepeatedly() {
+ context.commitTransaction();
+ assertTrue(context.shouldCommitBatch());
+ assertFalse(context.shouldCommitBatch());
+ }
+
+ @Test
+ public void shouldNotCommitRecordRepeatedly() {
+ context.commitTransaction(RECORD);
+ assertTrue(context.shouldCommitOn(RECORD));
+ assertFalse(context.shouldCommitOn(RECORD));
+ }
+
+ @Test
+ public void shouldNotAbortBatchRepeatedly() {
+ context.abortTransaction();
+ assertTrue(context.shouldAbortBatch());
+ assertFalse(context.shouldAbortBatch());
+ }
+
+ @Test
+ public void shouldNotAbortRecordRepeatedly() {
+ context.abortTransaction(RECORD);
+ assertTrue(context.shouldAbortOn(RECORD));
+ assertFalse(context.shouldAbortOn(RECORD));
+ }
+
+ @Test
+ public void shouldDisallowConflictingRequests() {
+ context.commitTransaction();
+ context.abortTransaction();
+ assertThrows(IllegalStateException.class, context::shouldCommitBatch);
+ assertThrows(IllegalStateException.class, context::shouldAbortBatch);
+
+ context = new WorkerTransactionContext();
+ context.commitTransaction(RECORD);
+ context.abortTransaction(RECORD);
+ assertThrows(IllegalStateException.class, () -> context.shouldCommitOn(RECORD));
+ assertThrows(IllegalStateException.class, () -> context.shouldAbortOn(RECORD));
+ }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 71cdcc2915..3249412259 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -287,7 +287,7 @@ public class DistributedHerderTest extends ThreadedTest {
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -321,7 +321,7 @@ public class DistributedHerderTest extends ThreadedTest {
PowerMock.expectLastCall();
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -394,7 +394,7 @@ public class DistributedHerderTest extends ThreadedTest {
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -467,7 +467,7 @@ public class DistributedHerderTest extends ThreadedTest {
rebalanceDelay);
expectConfigRefreshAndSnapshot(SNAPSHOT);
- worker.startTask(EasyMock.eq(TASK2), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK2), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -499,7 +499,7 @@ public class DistributedHerderTest extends ThreadedTest {
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -538,7 +538,7 @@ public class DistributedHerderTest extends ThreadedTest {
PowerMock.expectLastCall();
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -609,8 +609,8 @@ public class DistributedHerderTest extends ThreadedTest {
PowerMock.expectLastCall();
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
- EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+ worker.startSourceTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -1566,13 +1566,15 @@ public class DistributedHerderTest extends ThreadedTest {
// But only one task is assigned to this worker
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(TASK0)).anyTimes();
+ herder.configState = SNAPSHOT;
+
worker.stopAndAwaitTasks(Collections.singletonList(TASK0));
PowerMock.expectLastCall();
herder.onRestart(TASK0);
EasyMock.expectLastCall();
- worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.anyObject(TargetState.class));
PowerMock.expectLastCall().andReturn(true);
@@ -1597,6 +1599,8 @@ public class DistributedHerderTest extends ThreadedTest {
EasyMock.expect(herder.assignment.connectors()).andReturn(Collections.singletonList(CONN1)).anyTimes();
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(taskId)).anyTimes();
+ herder.configState = SNAPSHOT;
+
worker.stopAndAwaitConnector(CONN1);
PowerMock.expectLastCall();
@@ -1614,7 +1618,7 @@ public class DistributedHerderTest extends ThreadedTest {
herder.onRestart(taskId);
EasyMock.expectLastCall();
- worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.anyObject(TargetState.class));
PowerMock.expectLastCall().andReturn(true);
@@ -1634,7 +1638,7 @@ public class DistributedHerderTest extends ThreadedTest {
expectConfigRefreshAndSnapshot(SNAPSHOT);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
- worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
@@ -1648,7 +1652,7 @@ public class DistributedHerderTest extends ThreadedTest {
worker.stopAndAwaitTask(TASK0);
PowerMock.expectLastCall();
- worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
@@ -2034,7 +2038,7 @@ public class DistributedHerderTest extends ThreadedTest {
// join
expectRebalance(1, Collections.emptyList(), singletonList(TASK0));
expectConfigRefreshAndSnapshot(SNAPSHOT);
- worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -2073,7 +2077,7 @@ public class DistributedHerderTest extends ThreadedTest {
// join
expectRebalance(1, Collections.emptyList(), singletonList(TASK0));
expectConfigRefreshAndSnapshot(SNAPSHOT);
- worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -2121,7 +2125,7 @@ public class DistributedHerderTest extends ThreadedTest {
// join
expectRebalance(1, Collections.emptyList(), singletonList(TASK0));
expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1);
- worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -2190,7 +2194,7 @@ public class DistributedHerderTest extends ThreadedTest {
expectRebalance(Collections.emptyList(), Collections.emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, Collections.emptyList(),
Arrays.asList(TASK0));
- worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -2237,7 +2241,7 @@ public class DistributedHerderTest extends ThreadedTest {
PowerMock.expectLastCall();
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
@@ -2304,7 +2308,7 @@ public class DistributedHerderTest extends ThreadedTest {
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
@@ -2396,7 +2400,7 @@ public class DistributedHerderTest extends ThreadedTest {
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+ worker.startSourceTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 99848d8314..ddc030ada4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -369,7 +369,7 @@ public class StandaloneHerderTest {
new HashSet<>(),
new HashSet<>(),
transformer);
- worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+ worker.startSourceTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
EasyMock.expectLastCall().andReturn(true);
PowerMock.replayAll();
@@ -410,7 +410,7 @@ public class StandaloneHerderTest {
new HashSet<>(),
new HashSet<>(),
transformer);
- worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+ worker.startSourceTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
EasyMock.expectLastCall().andReturn(false);
PowerMock.replayAll();
@@ -583,7 +583,7 @@ public class StandaloneHerderTest {
new HashSet<>(),
new HashSet<>(),
transformer);
- worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED);
+ worker.startSinkTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED);
EasyMock.expectLastCall().andReturn(true);
PowerMock.replayAll();
@@ -649,7 +649,7 @@ public class StandaloneHerderTest {
new HashSet<>(),
new HashSet<>(),
transformer);
- worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED);
+ worker.startSinkTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SINK), herder, TargetState.STARTED);
EasyMock.expectLastCall().andReturn(true);
PowerMock.replayAll();
@@ -918,7 +918,11 @@ public class StandaloneHerderTest {
new HashSet<>(),
new HashSet<>(),
transformer);
- worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
+ if (sourceSink.equals(SourceSink.SOURCE)) {
+ worker.startSourceTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
+ } else {
+ worker.startSinkTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
+ }
EasyMock.expectLastCall().andReturn(true);
EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName()))
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 38116642fb..a97831801d 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -287,7 +287,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<!-- Suppress warnings about ignoring the return value of await.
This is done intentionally because we use other clues to determine
if the wait was cut short. -->
- <Class name="org.apache.kafka.connect.runtime.WorkerSourceTask"/>
+ <Class name="org.apache.kafka.connect.runtime.AbstractWorkerSourceTask"/>
<Method name="execute"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED"/>
</Match>
@@ -311,16 +311,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>
- <Match>
- <!--
- Temporarily suppress warnings about unused private methods (will be used in a subsequent pull request)
- TODO: Remove this before merging to trunk
- -->
- <Class name="org.apache.kafka.connect.runtime.distributed.DistributedHerder"/>
- <Method name="verifyTaskGenerationAndOwnership"/>
- <Bug pattern="UPM_UNCALLED_PRIVATE_METHOD"/>
- </Match>
-
<Match>
<!-- Suppress a warning about intentional switch statement fallthrough. -->
<Class name="org.apache.kafka.common.security.authenticator.SaslClientAuthenticator"/>