You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/28 06:50:48 UTC

[kafka] branch trunk updated: KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (#8720)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 38c1e96  KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (#8720)
38c1e96 is described below

commit 38c1e96d2c2084c7f3c3f9e5309ca91953e7c88f
Author: Aakash Shah <as...@confluent.io>
AuthorDate: Wed May 27 23:49:57 2020 -0700

    KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (#8720)
    
    Implementation for KIP-610: https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors based on which sink connectors can now report errors at the final stages of the stream that exports records to the sink system.
    
    This PR adds the `ErrantRecordReporter` interface as well as its implementation - `WorkerErrantRecordReporter`. The `WorkerErrantRecordReporter` is created in `Worker` and brought up through `WorkerSinkTask` to `WorkerSinkTaskContext`.
    
    An integration test and unit tests have been added.
    
    Reviewers: Lev Zemlyanov <le...@confluent.io>, Greg Harris <gr...@confluent.io>, Chris Egerton <ch...@confluent.io>, Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/connect/sink/ErrantRecordReporter.java   |  53 +++++++
 .../apache/kafka/connect/sink/SinkTaskContext.java |  26 ++++
 .../kafka/connect/runtime/InternalSinkRecord.java  |  82 ++++++++++
 .../kafka/connect/runtime/SinkConnectorConfig.java |   5 +
 .../org/apache/kafka/connect/runtime/Worker.java   |  20 ++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  23 ++-
 .../connect/runtime/WorkerSinkTaskContext.java     |   6 +
 .../runtime/errors/DeadLetterQueueReporter.java    |  18 ++-
 .../connect/runtime/errors/ErrorReporter.java      |   9 +-
 .../kafka/connect/runtime/errors/LogReporter.java  |  11 +-
 .../connect/runtime/errors/ProcessingContext.java  |  24 ++-
 .../runtime/errors/RetryWithToleranceOperator.java |  11 ++
 .../runtime/errors/WorkerErrantRecordReporter.java | 167 +++++++++++++++++++++
 .../integration/ErrantRecordSinkConnector.java     |  61 ++++++++
 .../integration/ExampleConnectIntegrationTest.java |  69 +++++++++
 .../integration/MonitorableSinkConnector.java      |   8 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     |   2 +-
 .../ErrorHandlingTaskWithTopicCreationTest.java    |   6 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |   2 +-
 .../runtime/WorkerSinkTaskThreadedTest.java        |   6 +-
 .../connect/runtime/errors/ErrorReporterTest.java  |  35 +++++
 .../errors/RetryWithToleranceOperatorTest.java     |  14 ++
 .../errors/WorkerErrantRecordReporterTest.java     |  74 +++++++++
 .../connect/runtime/isolation/PluginUtilsTest.java |   3 +
 25 files changed, 710 insertions(+), 27 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d13cd88..0241e6e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -12,7 +12,7 @@
     <suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
               files="(SchemaGenerator|MessageDataGenerator|FieldSpec).java"/>
     <suppress checks="NPathComplexity"
-              files="(MessageDataGenerator|FieldSpec).java"/>
+              files="(MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
     <suppress checks="JavaNCSS"
               files="(ApiMessageType).java|MessageDataGenerator.java"/>
     <suppress checks="MethodLength"
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
new file mode 100644
index 0000000..a20e1e3
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * Component that the sink task can use as it {@link SinkTask#put(java.util.Collection)}.
+ * Reporter of problematic records and the corresponding problems.
+ *
+ * @since 2.6
+ */
+public interface ErrantRecordReporter {
+
+    /**
+     * Report a problematic record and the corresponding error to be written to the sink
+     * connector's dead letter queue (DLQ).
+     *
+     * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+     * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+     * record has been written or throw any exception that occurred while sending the record.
+     * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+     * immediately.
+     *
+     * Connect guarantees that sink records reported through this reporter will be written to the error topic
+     * before the framework calls the {@link SinkTask#preCommit(java.util.Map)} method and therefore before
+     * committing the consumer offsets. SinkTask implementations can use the Future when stronger guarantees
+     * are required.
+     *
+     * @param record the problematic record; may not be null
+     * @param error  the error capturing the problem with the record; may not be null
+     * @return a future that can be used to block until the record and error are reported
+     *         to the DLQ
+     * @throws ConnectException if the error reporter and DLQ fails to write a reported record
+     * @since 2.6
+     */
+    Future<Void> report(SinkRecord record, Throwable error);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
index 340ef80..a431dbe 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
@@ -95,4 +95,30 @@ public interface SinkTaskContext {
      */
     void requestCommit();
 
+    /**
+     * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord records}
+     * passed to the {@link SinkTask#put(java.util.Collection)} method. When reporting a failed record,
+     * the sink task will receive a {@link java.util.concurrent.Future} that the task can optionally use to wait until
+     * the failed record and exception have been written to Kafka. Note that the result of
+     * this method may be null if this connector has not been configured to use a reporter.
+     *
+     * <p>This method was added in Apache Kafka 2.6. Sink tasks that use this method but want to
+     * maintain backward compatibility so they can also be deployed to older Connect runtimes
+     * should guard the call to this method with a try-catch block, since calling this method will result in a
+     * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the sink connector is deployed to
+     * Connect runtimes older than Kafka 2.6. For example:
+     * <pre>
+     *     ErrantRecordReporter reporter;
+     *     try {
+     *         reporter = context.errantRecordReporter();
+     *     } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *         reporter = null;
+     *     }
+     * </pre>
+     *
+     * @return the reporter; null if no error reporter has been configured for the connector
+     * @since 2.6
+     */
+    ErrantRecordReporter errantRecordReporter();
+
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
new file mode 100644
index 0000000..69554ff
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+/**
+ * A specialization of {@link SinkRecord} that allows a {@link WorkerSinkTask} to track the
+ * original {@link ConsumerRecord} for each {@link SinkRecord}. It is used internally and not
+ * exposed to connectors.
+ */
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }
+
+    protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
+                                 int partition, Schema keySchema, Object key, Schema valueSchema,
+                                 Object value, long kafkaOffset, Long timestamp,
+                                 TimestampType timestampType, Iterable<Header> headers) {
+        super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
+        this.originalRecord = originalRecord;
+    }
+
+    @Override
+    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key,
+                                Schema valueSchema, Object value, Long timestamp,
+                                Iterable<Header> headers) {
+        return new InternalSinkRecord(originalRecord, topic, kafkaPartition, keySchema, key,
+            valueSchema, value, kafkaOffset(), timestamp, timestampType(), headers());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return super.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
+    }
+
+    /**
+    * Return the original consumer record that this sink record represents.
+    *
+    * @return the original consumer record; never null
+    */
+    public ConsumerRecord<byte[], byte[]> originalRecord() {
+        return originalRecord;
+    }
+}
+
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 0672f4e..71ba9c7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -120,6 +120,11 @@ public class SinkConnectorConfig extends ConnectorConfig {
         return getBoolean(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG);
     }
 
+    public boolean enableErrantRecordReporter() {
+        String dqlTopic = dlqTopicName();
+        return !dqlTopic.isEmpty() || enableErrorLog();
+    }
+
     public static void main(String[] args) {
         System.out.println(config.toHtml());
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 28b1149..26b0444 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.LogReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -557,13 +558,15 @@ public class Worker {
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
 
             Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
             KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
 
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
                                       valueConverter, headerConverter, transformationChain, consumer, loader, time,
-                                      retryWithToleranceOperator, herder.statusBackingStore());
+                                      retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask and current is {}", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
@@ -718,6 +721,7 @@ public class Worker {
                                                                 connectorClientConfigOverridePolicy, kafkaClusterId);
             Map<String, Object> adminProps = adminConfigs(id, "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
             DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
+
             reporters.add(reporter);
         }
 
@@ -733,6 +737,20 @@ public class Worker {
         return reporters;
     }
 
+    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+        SinkConnectorConfig connConfig,
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        // check if errant record reporter topic is configured
+        if (connConfig.enableErrantRecordReporter()) {
+            return new WorkerErrantRecordReporter(retryWithToleranceOperator, keyConverter, valueConverter, headerConverter);
+        }
+        return null;
+    }
+
     private void stopTask(ConnectorTaskId taskId) {
         try (LoggingContext loggingContext = LoggingContext.forTask(taskId)) {
             WorkerTask task = tasks.get(taskId);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 3df9451..8c4a496 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -43,6 +43,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
+import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -94,6 +95,7 @@ class WorkerSinkTask extends WorkerTask {
     private int commitFailures;
     private boolean pausedForRedelivery;
     private boolean committing;
+    private final WorkerErrantRecordReporter workerErrantRecordReporter;
 
     public WorkerSinkTask(ConnectorTaskId id,
                           SinkTask task,
@@ -110,6 +112,7 @@ class WorkerSinkTask extends WorkerTask {
                           ClassLoader loader,
                           Time time,
                           RetryWithToleranceOperator retryWithToleranceOperator,
+                          WorkerErrantRecordReporter workerErrantRecordReporter,
                           StatusBackingStore statusBackingStore) {
         super(id, statusListener, initialState, loader, connectMetrics,
                 retryWithToleranceOperator, time, statusBackingStore);
@@ -136,6 +139,7 @@ class WorkerSinkTask extends WorkerTask {
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.workerErrantRecordReporter = workerErrantRecordReporter;
     }
 
     @Override
@@ -360,6 +364,12 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     private void commitOffsets(long now, boolean closing) {
+        if (workerErrantRecordReporter != null) {
+            log.trace("Awaiting all reported errors to be completed");
+            workerErrantRecordReporter.awaitAllFutures();
+            log.trace("Completed all reported errors");
+        }
+
         if (currentOffsets.isEmpty())
             return;
 
@@ -502,7 +512,14 @@ class WorkerSinkTask extends WorkerTask {
         if (isTopicTrackingEnabled) {
             recordActiveTopic(origRecord.topic());
         }
-        return transformationChain.apply(origRecord);
+
+        // Apply the transformations
+        SinkRecord transformedRecord = transformationChain.apply(origRecord);
+        if (transformedRecord == null) {
+            return null;
+        }
+        // Error reporting will need to correlate each sink record with the original consumer record
+        return new InternalSinkRecord(msg, transformedRecord);
     }
 
     private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
@@ -518,6 +535,10 @@ class WorkerSinkTask extends WorkerTask {
         return result;
     }
 
+    protected WorkerErrantRecordReporter workerErrantRecordReporter() {
+        return workerErrantRecordReporter;
+    }
+
     private void resumeAll() {
         for (TopicPartition tp : consumer.assignment())
             if (!context.pausedPartitions().contains(tp))
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 3a6b0d6..724b02e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
 import org.apache.kafka.connect.sink.SinkTaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -159,6 +160,11 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
     }
 
     @Override
+    public ErrantRecordReporter errantRecordReporter() {
+        return sinkTask.workerErrantRecordReporter();
+    }
+
+    @Override
     public String toString() {
         return "WorkerSinkTaskContext{" +
                "id=" + sinkTask.id +
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 20ed2f2..e07a5d0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.record.RecordBatch;
@@ -36,7 +37,9 @@ import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import static java.util.Collections.singleton;
 
@@ -67,6 +70,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
     private final SinkConnectorConfig connConfig;
     private final ConnectorTaskId connectorTaskId;
     private final ErrorHandlingMetrics errorHandlingMetrics;
+    private final String dlqTopicName;
 
     private KafkaProducer<byte[], byte[]> kafkaProducer;
 
@@ -111,25 +115,25 @@ public class DeadLetterQueueReporter implements ErrorReporter {
         this.connConfig = connConfig;
         this.connectorTaskId = id;
         this.errorHandlingMetrics = errorHandlingMetrics;
+        this.dlqTopicName = connConfig.dlqTopicName().trim();
     }
 
     /**
-     * Write the raw records into a Kafka topic.
+     * Write the raw records into a Kafka topic and return the producer future.
      *
      * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+     * @return the future associated with the writing of this record; never null
      */
-    public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+    public Future<RecordMetadata> report(ProcessingContext context) {
         if (dlqTopicName.isEmpty()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
-
         errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
         ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord();
         if (originalMessage == null) {
             errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         ProducerRecord<byte[], byte[]> producerRecord;
@@ -145,7 +149,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
             populateContextHeaders(producerRecord, context);
         }
 
-        this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
+        return this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
             if (exception != null) {
                 log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception);
                 errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
index 5eaa427..f9bc2f2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -16,17 +16,22 @@
  */
 package org.apache.kafka.connect.runtime.errors;
 
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import java.util.concurrent.Future;
+
 /**
  * Report an error using the information contained in the {@link ProcessingContext}.
  */
 public interface ErrorReporter extends AutoCloseable {
 
     /**
-     * Report an error.
+     * Report an error and return the producer future.
      *
      * @param context the processing context (cannot be null).
+     * @return future result from the producer sending a record to Kafka.
      */
-    void report(ProcessingContext context);
+    Future<RecordMetadata> report(ProcessingContext context);
 
     @Override
     default void close() { }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
index 8b07adf..cf9db2c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.connect.runtime.errors;
 
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 /**
  * Writes errors and their context to application logs.
@@ -29,6 +32,7 @@ import java.util.Objects;
 public class LogReporter implements ErrorReporter {
 
     private static final Logger log = LoggerFactory.getLogger(LogReporter.class);
+    private static final Future<RecordMetadata> COMPLETED = CompletableFuture.completedFuture(null);
 
     private final ConnectorTaskId id;
     private final ConnectorConfig connConfig;
@@ -50,17 +54,18 @@ public class LogReporter implements ErrorReporter {
      * @param context the processing context.
      */
     @Override
-    public void report(ProcessingContext context) {
+    public Future<RecordMetadata> report(ProcessingContext context) {
         if (!connConfig.enableErrorLog()) {
-            return;
+            return COMPLETED;
         }
 
         if (!context.failed()) {
-            return;
+            return COMPLETED;
         }
 
         log.error(message(context), context.error());
         errorHandlingMetrics.recordErrorLogged();
+        return COMPLETED;
     }
 
     // Visible for testing
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
index e7fb031..0ddf894 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -17,13 +17,20 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 /**
  * Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant
@@ -132,11 +139,22 @@ class ProcessingContext implements AutoCloseable {
 
     /**
      * Report errors. Should be called only if an error was encountered while executing the operation.
+     *
+     * @return a errant record future that potentially aggregates the producer futures
      */
-    public void report() {
-        for (ErrorReporter reporter: reporters) {
-            reporter.report(this);
+    public Future<Void> report() {
+        if (reporters.size() == 1) {
+            return new ErrantRecordFuture(Collections.singletonList(reporters.iterator().next().report(this)));
+        }
+
+        List<Future<RecordMetadata>> futures = reporters.stream()
+                .map(r -> r.report(this))
+                .filter(Future::isDone)
+                .collect(Collectors.toCollection(LinkedList::new));
+        if (futures.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
         }
+        return new ErrantRecordFuture(futures);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 4e627ef..ff1702d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -83,6 +84,16 @@ public class RetryWithToleranceOperator implements AutoCloseable {
         this.time = time;
     }
 
+    public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
+                                      ConsumerRecord<byte[], byte[]> consumerRecord,
+                                      Throwable error) {
+        context.consumerRecord(consumerRecord);
+        context.currentContext(stage, executingClass);
+        context.error(error);
+        errorHandlingMetrics.recordError();
+        return context.report();
+    }
+
     /**
      * Execute the recoverable operation. If the operation is already in a failed state, then simply return
      * with the existing failure.
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
new file mode 100644
index 0000000..6e1fa53
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
+
+    // Visible for testing
+    protected final LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        // Most of the records will be an internal sink record, but the task could potentially
+        // report modified or new records, so handle both cases
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            // Generate a new consumer record from the modified sink record. We prefer
+            // to send the original consumer record (pre-transformed) to the DLQ,
+            // but in this case we don't have one and send the potentially transformed
+            // record instead
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic,
+                record.valueSchema(), record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            int keyLength = key != null ? key.length : -1;
+            int valLength = value != null ? value.length : -1;
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
+                valLength, key, value, headers);
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void awaitAllFutures() {
+        Future<?> future;
+        while ((future = futures.poll()) != null) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while awaiting an errant record future's completion.");
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    /**
+     * Wrapper class to aggregate producer futures and abstract away the record metadata from the
+     * Connect user.
+     */
+    public static class ErrantRecordFuture implements Future<Void> {
+
+        private final List<Future<RecordMetadata>> futures;
+
+        public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) {
+            futures = producerFutures;
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException("Reporting an errant record cannot be cancelled.");
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            return futures.stream().allMatch(Future::isDone);
+        }
+
+        public Void get() throws InterruptedException, ExecutionException {
+            for (Future<RecordMetadata> future: futures) {
+                future.get();
+            }
+            return null;
+        }
+
+        public Void get(long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+            for (Future<RecordMetadata> future: futures) {
+                future.get(timeout, unit);
+            }
+            return null;
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
new file mode 100644
index 0000000..0fe2f88
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ErrantRecordSinkConnector extends MonitorableSinkConnector {
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ErrantRecordSinkTask.class;
+    }
+
+    public static class ErrantRecordSinkTask extends MonitorableSinkTask {
+        private ErrantRecordReporter reporter;
+
+        public ErrantRecordSinkTask() {
+            super();
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            super.start(props);
+            reporter = context.errantRecordReporter();
+        }
+
+        @Override
+        public void put(Collection<SinkRecord> records) {
+            for (SinkRecord rec : records) {
+                taskHandle.record();
+                TopicPartition tp = cachedTopicPartitions
+                    .computeIfAbsent(rec.topic(), v -> new HashMap<>())
+                    .computeIfAbsent(rec.kafkaPartition(), v -> new TopicPartition(rec.topic(), rec.kafkaPartition()));
+                committedOffsets.put(tp, committedOffsets.getOrDefault(tp, 0L) + 1);
+                reporter.report(rec, new Throwable());
+            }
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 8538fb4..a2b40e5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -36,6 +36,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
 import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG;
 import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
 import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
 import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
@@ -65,6 +66,9 @@ public class ExampleConnectIntegrationTest {
     private static final String CONNECTOR_NAME = "simple-conn";
     private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName();
     private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName();
+    private static final String DLQ_TOPIC = "dlq-topic";
+    private static final String ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME =
+        ErrantRecordSinkConnector.class.getSimpleName();
 
     private EmbeddedConnectCluster connect;
     private ConnectorHandle connectorHandle;
@@ -219,6 +223,71 @@ public class ExampleConnectIntegrationTest {
         connect.deleteConnector(CONNECTOR_NAME);
     }
 
+    @Test
+    public void testErrantRecordReporter() throws Exception {
+        connect.kafka().createTopic(DLQ_TOPIC, 1);
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME);
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
+
+        // validate the intended connector configuration, a config that errors
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 1,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // add missing configuration to make the config valid
+        props.put("name", CONNECTOR_NAME);
+
+        // validate the intended connector configuration, a valid config
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 0,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // start a sink connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(this::checkForPartitionAssignment,
+            CONNECTOR_SETUP_DURATION_MS,
+            "Connector tasks were not assigned a partition each.");
+
+        // produce some messages into source topic partitions
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            connect.kafka().produce("test-topic", i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i);
+        }
+
+        // consume all records from the source topic or fail, to ensure that they were correctly produced.
+        assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED,
+            connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count());
+
+        // wait for the connector tasks to consume all records.
+        connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
+
+        // wait for the connector tasks to commit all records.
+        connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
+
+        // consume all records from the dlq topic or fail, to ensure that they were correctly produced
+        int recordNum = connect.kafka().consume(
+            NUM_RECORDS_PRODUCED,
+            RECORD_TRANSFER_DURATION_MS,
+            DLQ_TOPIC
+        ).count();
+
+        // delete connector
+        connect.deleteConnector(CONNECTOR_NAME);
+    }
+
     /**
      * Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from a
      * {@link org.apache.kafka.test.TestUtils#waitForCondition} that will throw an error if this method continued
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
index 05b2dfd..9bd0756 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
@@ -90,10 +90,10 @@ public class MonitorableSinkConnector extends TestSinkConnector {
 
         private String connectorName;
         private String taskId;
-        private TaskHandle taskHandle;
-        private Set<TopicPartition> assignments;
-        private Map<TopicPartition, Long> committedOffsets;
-        private Map<String, Map<Integer, TopicPartition>> cachedTopicPartitions;
+        TaskHandle taskHandle;
+        Set<TopicPartition> assignments;
+        Map<TopicPartition, Long> committedOffsets;
+        Map<String, Map<Integer, TopicPartition>> cachedTopicPartitions;
 
         public MonitorableSinkTask() {
             this.assignments = new HashSet<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 4434904..c471b03 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -538,7 +538,7 @@ public class ErrorHandlingTaskTest {
             taskId, sinkTask, statusListener, initialState, workerConfig,
             ClusterConfigState.EMPTY, metrics, converter, converter,
             headerConverter, sinkTransforms, consumer, pluginLoader, time,
-                retryWithToleranceOperator, statusBackingStore);
+                retryWithToleranceOperator, null, statusBackingStore);
     }
 
     private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
index 909ec4a..aba6445 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.LogReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.ToleranceType;
+import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -161,6 +162,9 @@ public class ErrorHandlingTaskWithTopicCreationTest {
     @SuppressWarnings("unused")
     @Mock private StatusBackingStore statusBackingStore;
 
+    @Mock
+    private WorkerErrantRecordReporter workerErrantRecordReporter;
+
     private ErrorHandlingMetrics errorHandlingMetrics;
 
     // when this test becomes parameterized, this variable will be a test parameter
@@ -548,7 +552,7 @@ public class ErrorHandlingTaskWithTopicCreationTest {
             taskId, sinkTask, statusListener, initialState, workerConfig,
             ClusterConfigState.EMPTY, metrics, converter, converter,
             headerConverter, sinkTransforms, consumer, pluginLoader, time,
-                retryWithToleranceOperator, statusBackingStore);
+                retryWithToleranceOperator, workerErrantRecordReporter, statusBackingStore);
     }
 
     private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 5dc2f44..12a553f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -183,7 +183,7 @@ public class WorkerSinkTaskTest {
             taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics,
             keyConverter, valueConverter, headerConverter,
             transformationChain, consumer, pluginLoader, time,
-            RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore);
+            RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore);
     }
 
     @After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index be2584a..7a82b20 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -144,7 +144,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                 taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter,
                 valueConverter, headerConverter,
                 new TransformationChain<>(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR),
-                consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore);
+                consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore);
 
         recordsReturned = 0;
     }
@@ -187,7 +187,9 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
             for (SinkRecord rec : recs) {
                 SinkRecord referenceSinkRecord
                         = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset, TIMESTAMP, TIMESTAMP_TYPE);
-                assertEquals(referenceSinkRecord, rec);
+                InternalSinkRecord referenceInternalSinkRecord =
+                    new InternalSinkRecord(null, referenceSinkRecord);
+                assertEquals(referenceInternalSinkRecord, rec);
                 offset++;
             }
         }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index f01cd49..85f6948 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -43,6 +43,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
 import static java.util.Collections.emptyMap;
@@ -147,6 +148,21 @@ public class ErrorReporterTest {
     }
 
     @Test
+    public void testDLQReportAndReturnFuture() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
+            producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata);
+        replay(producer);
+
+        deadLetterQueueReporter.report(context);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testCloseDLQ() {
         DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
             producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
@@ -212,6 +228,25 @@ public class ErrorReporterTest {
     }
 
     @Test
+    public void testLogReportAndReturnFuture() {
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
+        props.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
+
+        LogReporter logReporter = new LogReporter(TASK_ID, config(props), errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        String msg = logReporter.message(context);
+        assertEquals("Error encountered in task job-0. Executing stage 'KEY_CONVERTER' with class " +
+            "'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test-topic', " +
+            "partition=5, offset=100}.", msg);
+
+        Future<RecordMetadata> future = logReporter.report(context);
+        assertTrue(future instanceof CompletableFuture);
+    }
+
+    @Test
     public void testSetDLQConfigs() {
         SinkConnectorConfig configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC));
         assertEquals(configuration.dlqTopicName(), DLQ_TOPIC);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index 43bac54..3aed19d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
@@ -88,12 +89,25 @@ public class RetryWithToleranceOperatorTest {
     private Operation<String> mockOperation;
 
     @Mock
+    private ConsumerRecord<byte[], byte[]> consumerRecord;
+
+    @Mock
     ErrorHandlingMetrics errorHandlingMetrics;
 
     @Mock
     Plugins plugins;
 
     @Test
+    public void testExecuteFailed() {
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
+            ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+        retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, new Throwable());
+    }
+
+    @Test
     public void testHandleExceptionInTransformations() {
         testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
new file mode 100644
index 0000000..07a4f9e
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerErrantRecordReporterTest {
+
+    private WorkerErrantRecordReporter reporter;
+
+    @Mock
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+
+    @Mock
+    private Converter converter;
+
+    @Mock
+    private HeaderConverter headerConverter;
+
+    @Mock
+    private SinkRecord record;
+
+    @Before
+    public void setup() {
+        reporter = new WorkerErrantRecordReporter(
+            retryWithToleranceOperator,
+            converter,
+            converter,
+            headerConverter
+        );
+    }
+
+    @Test
+    public void testGetAllFutures() {
+        assertTrue(reporter.futures.isEmpty());
+        for (int i = 0; i < 4; i++) {
+            reporter.futures.add(CompletableFuture.completedFuture(null));
+        }
+        assertFalse(reporter.futures.isEmpty());
+        reporter.awaitAllFutures();
+        assertTrue(reporter.futures.isEmpty());
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index c406ead..4baaa6d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -117,6 +117,9 @@ public class PluginUtilsTest {
         assertFalse(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.connect.rest.ConnectRestExtension")
         );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.sink.ErrantRecordReporter"
+        ));
     }
 
     @Test