You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:49:57 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r429975231



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+   * record has been written or throw any exception that occurred while sending the record.
+   * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+   * immediately.
+   *
+   * @param record the problematic record; may not be null
+   * @param error  the error capturing the problem with the record; may not be null
+   * @return a future that can be used to block until the record and error are reported
+   *         to the DLQ

Review comment:
       Also, what exceptions can this method throw? Should we add something like:
   
   ```suggestion
      *         to the DLQ
      * @throws ConnectException if the error reporter and DLQ fail to write a 
      *         reported record and are configured with {@code error.tolerance=NONE}
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+   * record has been written or throw any exception that occurred while sending the record.
+   * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+   * immediately.
+   *

Review comment:
       ```suggestion
      * 
      * Connect guarantees that sink records reported through this reporter will be written to the error topic
      * before the framework calls the {@link SinkTask#preCommit(Map)} method and therefore before
      * committing the consumer offsets. SinkTask implementations can use the Future when stronger guarantees
      * are required.
      * 
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {

Review comment:
       Please add JavaDoc for the interface, with `@since 2.6.0`.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(

Review comment:
       Nit: static methods should appear before the non-static fields.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    // Visible for testing
+    public class ErrantRecordFuture implements Future<Void> {
+
+        Future<RecordMetadata> future;

Review comment:
       Couldn't this be final?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -680,6 +689,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
                                                                 connectorClientConfigOverridePolicy);
             Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
             DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
+

Review comment:
       Nit: let's avoid adding new lines in code otherwise unaffected in the PR.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -531,13 +531,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter =
+                createWorkerErrantRecordReporter(
+                    id,
+                    sinkConfig,
+                    connectorClass,
+                    keyConverter,
+                    valueConverter,
+                    headerConverter
+                );

Review comment:
       Nit formatting:
   ```suggestion
               WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(
                       id, sinkConfig, connectorClass, keyConverter, valueConverter, headerConverter);
   ```
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -695,6 +705,32 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return reporters;
     }
 
+    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+        ConnectorTaskId id,
+        SinkConnectorConfig connConfig,
+        Class<? extends Connector> connectorClass,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        // check if errant record reporter topic is configured
+        String topic = connConfig.dlqTopicName();
+        if ((topic != null && !topic.isEmpty()) || connConfig.enableErrorLog()) {
+            Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
+                connectorClientConfigOverridePolicy);
+            Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            return WorkerErrantRecordReporter.createAndSetup(
+                adminProps,
+                producerProps,
+                connConfig,
+                keyConverter,
+                valueConverter,
+                headerConverter
+            );

Review comment:
       Nit formatting:
   ```suggestion
               return WorkerErrantRecordReporter.createAndSetup(adminProps, producerProps,
                   connConfig, keyConverter, valueConverter, headerConverter);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;

Review comment:
       All fields that can be `final` should be marked as such. This provides semantic intent to future developers and helps prevent unintentionally changing the fields in the future.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -531,13 +531,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter =
+                createWorkerErrantRecordReporter(
+                    id,
+                    sinkConfig,
+                    connectorClass,
+                    keyConverter,
+                    valueConverter,
+                    headerConverter
+                );

Review comment:
       At a higher level, why are we not reusing the `RetryWithToleranceOperator` here? I thought that was kind of the intent of the KIP, that this new `report(...)` method is just more way to capture problematic records using the existing DLQ functionality. I understand that might require other refactoring of that class (like returning a Future from the produce-like methods), but it seems like it would simplify things substantially by avoiding having to create our own producer and reuse a lot more of the functionality, such as metrics, retry count, logging, using the same producers, etc.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);

Review comment:
       Do we really want to pass the ExecutionException to the ConnectException, or would it be better to pass that exception's *cause* to the ConnectException?
   
   How about log messages here?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);

Review comment:
       I suggested earlier about reusing the `RetryWithToleranceOperator`, and that doing so might require adding a `produce`-like method to that class that simply reports a new error. If that method took a `Callback` here and passed it to its `producer.send(...)` call, then we could provide a callback that removed the (completed) future from our list, helping to keep that list as small as possible with only the incomplete futures.
   
   If we did that, we'd want to use a `LinkedList` rather than an `ArrayList`, since we're no longer removing futures only from the ends of the list.
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    // Visible for testing
+    public class ErrantRecordFuture implements Future<Void> {
+
+        Future<RecordMetadata> future;
+
+        public ErrantRecordFuture(Future<RecordMetadata> producerFuture) {
+            future = producerFuture;

Review comment:
       If we ensure that the producer future is never null, then we can remove the `if (future == null)` kind of checks in this class' methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org