You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/28 02:05:49 UTC

[GitHub] [kafka] kkonstantine commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

kkonstantine commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431512019



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }
+
+    protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
+                              int partition, Schema keySchema, Object key, Schema valueSchema,
+                              Object value, long kafkaOffset, Long timestamp,
+                              TimestampType timestampType, Iterable<Header> headers) {

Review comment:
       nit: indentation is a bit off here. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {

Review comment:
       A class javadoc would be helpful, in order to understand why this is introduced. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -553,13 +554,16 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter
+                = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,

Review comment:
       nit: I think we always keep `=` with the left operand. 

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * Component that the sink task can use as it {@link SinkTask#put(java.util.Collection)}.

Review comment:
       typo. Not sure how you want to say it. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -111,41 +115,41 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
         this.connConfig = connConfig;
         this.connectorTaskId = id;
         this.errorHandlingMetrics = errorHandlingMetrics;
+        this.dlqTopicName = connConfig.dlqTopicName().trim();
     }
 
     /**
-     * Write the raw records into a Kafka topic.
+     * Write the raw records into a Kafka topic and return the producer future.
      *
      * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+     * @return the future associated with the writing of this record; never null
      */
-    public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+    public Future<RecordMetadata> report(ProcessingContext context) {
         if (dlqTopicName.isEmpty()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
-
         errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
         ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord();
         if (originalMessage == null) {
             errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         ProducerRecord<byte[], byte[]> producerRecord;
         if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
             producerRecord = new ProducerRecord<>(dlqTopicName, null,
-                    originalMessage.key(), originalMessage.value(), originalMessage.headers());

Review comment:
       nit: it was correct before

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ErrantRecordSinkConnector extends MonitorableSinkConnector {
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ErrantRecordSinkTask.class;
+    }
+
+    public static class ErrantRecordSinkTask extends MonitorableSinkTask {
+        private ErrantRecordReporter reporter;
+
+        public ErrantRecordSinkTask() {
+            super();
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            super.start(props);
+            reporter = context.errantRecordReporter();
+        }
+
+        @Override
+        public void put(Collection<SinkRecord> records) {
+

Review comment:
       nit: extra blank line
   ```suggestion
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
##########
@@ -219,6 +223,72 @@ public void testSourceConnector() throws Exception {
         connect.deleteConnector(CONNECTOR_NAME);
     }
 
+    @Test
+    public void testErrantRecordReporter() throws Exception {
+        connect.kafka().createTopic(DLQ_TOPIC, 1);
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME);
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
+
+        // validate the intended connector configuration, a config that errors
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 1,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // add missing configuration to make the config valid
+        props.put("name", CONNECTOR_NAME);
+
+        // validate the intended connector configuration, a valid config
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 0,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // start a sink connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(this::checkForPartitionAssignment,
+            CONNECTOR_SETUP_DURATION_MS,
+            "Connector tasks were not assigned a partition each.");
+
+        // produce some messages into source topic partitions
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            connect.kafka().produce("test-topic", i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i);
+        }
+
+        // consume all records from the source topic or fail, to ensure that they were correctly produced.
+        assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED,
+            connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count());
+
+        // wait for the connector tasks to consume all records.
+        connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
+
+        // wait for the connector tasks to commit all records.
+        connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
+
+        // consume all records from the dlq topic or fail, to ensure that they were correctly produced
+        int recordNum = connect.kafka().consume(
+            NUM_RECORDS_PRODUCED,
+            RECORD_TRANSFER_DURATION_MS,
+            DLQ_TOPIC
+        ).count();
+
+        // delete connector
+        connect.deleteConnector(CONNECTOR_NAME);
+

Review comment:
       ```suggestion
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -518,6 +535,10 @@ private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
         return result;
     }
 
+    WorkerErrantRecordReporter workerErrantRecordReporter() {

Review comment:
       let's add `protected` here to be symmetric to what other fields that are accessed by the context have as scope 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -94,6 +95,7 @@
     private int commitFailures;
     private boolean pausedForRedelivery;
     private boolean committing;
+    private WorkerErrantRecordReporter workerErrantRecordReporter;

Review comment:
       should be `final` right?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -111,41 +115,41 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
         this.connConfig = connConfig;
         this.connectorTaskId = id;
         this.errorHandlingMetrics = errorHandlingMetrics;
+        this.dlqTopicName = connConfig.dlqTopicName().trim();
     }
 
     /**
-     * Write the raw records into a Kafka topic.
+     * Write the raw records into a Kafka topic and return the producer future.
      *
      * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+     * @return the future associated with the writing of this record; never null
      */
-    public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+    public Future<RecordMetadata> report(ProcessingContext context) {
         if (dlqTopicName.isEmpty()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
-
         errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
         ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord();
         if (originalMessage == null) {
             errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         ProducerRecord<byte[], byte[]> producerRecord;
         if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
             producerRecord = new ProducerRecord<>(dlqTopicName, null,
-                    originalMessage.key(), originalMessage.value(), originalMessage.headers());
+                originalMessage.key(), originalMessage.value(), originalMessage.headers());
         } else {
             producerRecord = new ProducerRecord<>(dlqTopicName, null, originalMessage.timestamp(),
-                    originalMessage.key(), originalMessage.value(), originalMessage.headers());

Review comment:
       nit: it was correct before

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }
+
+    protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
+                              int partition, Schema keySchema, Object key, Schema valueSchema,
+                              Object value, long kafkaOffset, Long timestamp,
+                              TimestampType timestampType, Iterable<Header> headers) {
+        super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
+        this.originalRecord = originalRecord;
+    }
+
+    @Override
+    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key,
+                                Schema valueSchema, Object value, Long timestamp,
+                                Iterable<Header> headers) {
+        return new InternalSinkRecord(originalRecord, topic, kafkaPartition, keySchema, key,
+            valueSchema, value, kafkaOffset(), timestamp, timestampType(), headers());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return super.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
+    }

Review comment:
       these overrides don't seem to add much. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -695,6 +705,32 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return reporters;
     }
 
+    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+        ConnectorTaskId id,
+        SinkConnectorConfig connConfig,
+        Class<? extends Connector> connectorClass,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        // check if errant record reporter topic is configured
+        String topic = connConfig.dlqTopicName();
+        if ((topic != null && !topic.isEmpty()) || connConfig.enableErrorLog()) {

Review comment:
       Also, topic can never be `null` if it's coming from a parsed config value that doesn't have `null` as its default value. (another way to think of that is that you can't pass a `null` value from properties)

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;

Review comment:
       ```suggestion
       protected final List<Future<Void>> futures;
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        // Most of the records will be an internal sink record, but the task could potentially
+        // report modified or new records, so handle both cases
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            // Generate a new consumer record from the modified sink record. We prefer
+            // to send the original consumer record (pre-transformed) to the DLQ,
+            // but in this case we don't have one and send the potentially transformed
+            // record instead
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic,
+                record.valueSchema(), record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            int keyLength = key != null ? key.length : -1;
+            int valLength = value != null ? value.length : -1;
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
+                valLength, key, value, headers);
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void awaitAllFutures() {
+        Future<?> future = null;
+        while ((future = futures.poll()) != null) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while awaiting an errant record future's completion.");
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    /**
+     * Wrapper class to aggregate producer futures and abstract away the record metadata from the
+     * Connect user.
+     */
+    public static class ErrantRecordFuture implements Future<Void> {
+
+        private final List<Future<RecordMetadata>> futures;
+
+        public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) {
+            futures = producerFutures;
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException("Reporting an errant record cannot be cancelled.");
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            for (Future<RecordMetadata> future: futures) {
+                if (!future.isDone()) {
+                    return false;
+                }
+            }
+            return true;

Review comment:
       ```suggestion
               return futures.stream().allMatch(Future::isDone);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
##########
@@ -132,11 +138,25 @@ public void currentContext(Stage stage, Class<?> klass) {
 
     /**
      * Report errors. Should be called only if an error was encountered while executing the operation.
+     *
+     * @return a errant record future that potentially aggregates the producer futures
      */
-    public void report() {
+    public Future<Void> report() {
+        if (reporters.size() == 1) {
+            return new ErrantRecordFuture(Collections.singletonList(reporters.iterator().next().report(this)));
+        }
+
+        List<Future<RecordMetadata>> futures = new LinkedList<>();

Review comment:
       Suggestion (can't add because of the deleted line): 
   ```
           List<Future<RecordMetadata>> futures = reporters.stream()
                   .map(r -> r.report(this))
                   .filter(Future::isDone)
                   .collect(Collectors.toCollection(LinkedList::new));
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        // Most of the records will be an internal sink record, but the task could potentially
+        // report modified or new records, so handle both cases
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            // Generate a new consumer record from the modified sink record. We prefer
+            // to send the original consumer record (pre-transformed) to the DLQ,
+            // but in this case we don't have one and send the potentially transformed
+            // record instead
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic,
+                record.valueSchema(), record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            int keyLength = key != null ? key.length : -1;
+            int valLength = value != null ? value.length : -1;
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
+                valLength, key, value, headers);
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void awaitAllFutures() {
+        Future<?> future = null;

Review comment:
       nit: initialization is not required




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org