You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/23 01:00:49 UTC

[GitHub] [kafka] aakashnshah opened a new pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

aakashnshah opened a new pull request #8720:
URL: https://github.com/apache/kafka/pull/8720


   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634910469


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431212650



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+/**
+ * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}.
+ * Reporter of problematic records and the corresponding problems.
+ *
+ * @since 2.6
+ */
+public interface ErrantRecordReporter {
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+   * record has been written or throw any exception that occurred while sending the record.
+   * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+   * immediately.
+   *
+   * Connect guarantees that sink records reported through this reporter will be written to the error topic
+   * before the framework calls the {@link SinkTask#preCommit(Map)} method and therefore before

Review comment:
       You need to qualify `Map` or import it:
   ```suggestion
      * before the framework calls the {@link SinkTask#preCommit(java.util.Map)} method and therefore before
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+/**
+ * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}.

Review comment:
       This is not legal JavaDoc:
   ```suggestion
    * Component that the sink task can use as it {@link SinkTask#put(java.util.Collection)}.
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -100,4 +110,29 @@ public String toString() {
                 ", timestampType=" + timestampType +
                 "} " + super.toString();
     }
+
+    public class InternalSinkRecord extends SinkRecord {
+
+        ConsumerRecord<byte[], byte[]> originalRecord;

Review comment:
       When you move `InternalSinkRecord` to the `runtime` module, be sure to make this `private final`.

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -68,6 +70,14 @@ public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySche
         return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
     }
 
+    public InternalSinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
+                                        long kafkaOffset, Long timestamp,
+                                        TimestampType timestampType, Iterable<Header> headers,
+                                        ConsumerRecord<byte[], byte[]> originalRecord) {
+        return new InternalSinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value,
+            kafkaOffset, timestamp, timestampType, headers, originalRecord);
+    }
+

Review comment:
       You can't change this public API. `InternalSinkRecord` needs to be in the `runtime` module.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +506,18 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+
+        InternalSinkRecord internalSinkRecord = origRecord.newRecord(origRecord.topic(),
+            origRecord.kafkaPartition(), origRecord.keySchema(), origRecord.key(),
+            origRecord.valueSchema(), origRecord.value(), origRecord.kafkaOffset(),
+            origRecord.timestamp(), origRecord.timestampType(), origRecord.headers(), msg);
+
         log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
                 this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
         if (isTopicTrackingEnabled) {
-            recordActiveTopic(origRecord.topic());
+            recordActiveTopic(internalSinkRecord.topic());
         }
-        return transformationChain.apply(origRecord);
+        return transformationChain.apply(internalSinkRecord);

Review comment:
       This line would change to:
   ```suggestion
           // Apply the transformations
           SinkRecord transformedRecord = transformationChain.apply(sinkRecord);
           if (transformedRecord == null) {
               // The record is being dropped
               return null;
           }
           // Error reporting will need to correlate each sink record with the original consumer record
           return new InternalSinkRecord(msg, transformedRecord);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -176,6 +182,43 @@ void populateContextHeaders(ProducerRecord<byte[], byte[]> producerRecord, Proce
         }
     }
 
+    public static KafkaProducer<byte[], byte[]> setUpTopicAndProducer(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        int dlqTopicNumPartitions
+    ) {
+        String dlqTopic = sinkConnectorConfig.dlqTopicName();
+
+        if (dlqTopic != null && !dlqTopic.isEmpty()) {
+            try (Admin admin = Admin.create(adminProps)) {
+                if (!admin.listTopics().names().get().contains(dlqTopic)) {
+                    log.error("Topic {} doesn't exist. Will attempt to create topic.", dlqTopic);
+                    NewTopic schemaTopicRequest = new NewTopic(
+                        dlqTopic,
+                        dlqTopicNumPartitions,
+                        sinkConnectorConfig.dlqTopicReplicationFactor()
+                    );
+                    admin.createTopics(singleton(schemaTopicRequest)).all().get();
+                }
+            } catch (InterruptedException e) {
+                throw new ConnectException(
+                    "Could not initialize errant record reporter with topic = " + dlqTopic,
+                    e
+                );
+            } catch (ExecutionException e) {
+                if (!(e.getCause() instanceof TopicExistsException)) {
+                    throw new ConnectException(
+                        "Could not initialize errant record reporter with topic = " + dlqTopic,
+                        e
+                    );
+                }
+            }
+            return new KafkaProducer<>(producerProps);
+        }
+        return null;
+    }
+

Review comment:
       I think we don't really need to make this change anymore, since it's only refactoring the existing code that we don't need to actually change anymore. (This PR is no longer using this logic in multiple places.)

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
##########
@@ -139,6 +145,18 @@ public void report() {
         }
     }
 
+    public Future<Void> report(Callback callback) {
+        List<Future<RecordMetadata>> futures = new ArrayList<>();
+        for (ErrorReporter reporter: reporters) {
+            Future<RecordMetadata> future = reporter.report(this, callback);
+            if (!future.isDone()) {
+                futures.add(future);
+            }
+        }
+        return new ErrantRecordFuture(futures);

Review comment:
       Since we often have just one reporter, it is probably worth avoiding the unnecessary allocations:
   ```suggestion
           if (reporters.size() == 1) {
               return reporters.get(0).report(this);
           }
           List<Future<RecordMetadata>> futures = new LinkedList<>();
           for (ErrorReporter reporter: reporters) {
               Future<RecordMetadata> future = reporter.report(this, callback);
               if (!future.isDone()) {
                   futures.add(future);
               }
           }
           if (futures.isEmpty()) {
               return CompletableFuture.completedFuture(null);
           }
           return new ErrantRecordFuture(futures);
   ```
   And since we don't know how many futures we'll add to the list (and it will likely be just zero if the DLQ is not configured or just one for the DLQ), let's use a `LinkedList` instead to avoid excessive allocation when adding the first element to the `ArrayList`.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final Callback callback = (metadata, exception) -> {
+        if (exception != null) {
+            throw new ConnectException("Failed to send the errant record to Kafka",
+                exception.getCause());
+        }
+    };
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function;
+
+        if (record instanceof InternalSinkRecord) {
+            function = sinkRecord -> ((InternalSinkRecord) sinkRecord).originalRecord();
+        } else {
+            function = sinkRecord -> {
+
+                String topic = record.topic();
+                byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+                byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(),
+                    record.value());
+
+                RecordHeaders headers = new RecordHeaders();
+                if (record.headers() != null) {
+                    for (Header header : record.headers()) {
+                        String headerKey = header.key();
+                        byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                            header.schema(), header.value());
+                        headers.add(headerKey, rawHeader);
+                    }
+                }
+
+                return new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                    record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, -1,
+                    -1, key, value, headers);
+
+            };
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(function, Stage.TASK_PUT,
+            SinkTask.class, record, error, callback);
+
+        futures.add(future);
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void getAllFutures() {
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while calling ");
+                throw new ConnectException(e);
+            }
+        }
+        futures.clear();
+    }
+
+    /**
+     * Wrapper class to aggregate producer futures and abstract away the record metadata from the
+     * Connect user.
+     */
+    public static class ErrantRecordFuture implements Future<Void> {
+
+        private final List<Future<RecordMetadata>> futures;
+

Review comment:
       Let's avoid unnecessary blank lines.
   ```suggestion
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+/**
+ * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}.
+ * Reporter of problematic records and the corresponding problems.
+ *
+ * @since 2.6
+ */
+public interface ErrantRecordReporter {
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+   * record has been written or throw any exception that occurred while sending the record.
+   * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+   * immediately.
+   *
+   * Connect guarantees that sink records reported through this reporter will be written to the error topic
+   * before the framework calls the {@link SinkTask#preCommit(Map)} method and therefore before
+   * committing the consumer offsets. SinkTask implementations can use the Future when stronger guarantees
+   * are required.
+   *
+   * @param record the problematic record; may not be null
+   * @param error  the error capturing the problem with the record; may not be null
+   * @return a future that can be used to block until the record and error are reported
+   *         to the DLQ
+   * @throws ConnectException if the error reporter and DLQ fails to write a reported record

Review comment:
       You need to qualify `ConnectException` or import it. The latter is probably better in this case to make the JavaDoc more readable in the code.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +506,18 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+
+        InternalSinkRecord internalSinkRecord = origRecord.newRecord(origRecord.topic(),
+            origRecord.kafkaPartition(), origRecord.keySchema(), origRecord.key(),
+            origRecord.valueSchema(), origRecord.value(), origRecord.kafkaOffset(),
+            origRecord.timestamp(), origRecord.timestampType(), origRecord.headers(), msg);
+

Review comment:
       Let's not create the `InternalSinkRecord` until *after* the transformation chain has been applied. That way we're not affected by any SMT that creates a new `SinkRecord` via a constructor (where we'd lose our `InternalSinkRecord`) rather than `newRecord(...)` (where we'd keep the `InternalSinkRecord`).
   ```suggestion
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -100,4 +110,29 @@ public String toString() {
                 ", timestampType=" + timestampType +
                 "} " + super.toString();
     }
+
+    public class InternalSinkRecord extends SinkRecord {
+
+        ConsumerRecord<byte[], byte[]> originalRecord;
+
+        public InternalSinkRecord(String topic, int partition, Schema keySchema, Object key,
+                                  Schema valueSchema, Object value, long kafkaOffset,
+                                  Long timestamp, TimestampType timestampType,
+                                  Iterable<Header> headers,
+                                  ConsumerRecord<byte[], byte[]> originalRecord) {
+            super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp,
+                timestampType, headers);
+            this.originalRecord = originalRecord;
+

Review comment:
       First of all, let's avoid adding unnecessary blank lines.
   
   Second, when you move this to `runtime`, you won't need to use this constructor and instead could use a much more straightforward one:
   ```
            public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
               super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
                       record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
                       record.timestampType(), record.headers());
               this.originalRecord = Objects.requireNonNull(originalRecord);
           }
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
##########
@@ -95,4 +95,30 @@
      */
     void requestCommit();
 
+    /**
+     * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord records}
+     * passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record,

Review comment:
       This is invalid:
   ```suggestion
        * passed to the {@link SinkTask#put(java.util.Collection)} method. When reporting a failed record,
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final Callback callback = (metadata, exception) -> {
+        if (exception != null) {
+            throw new ConnectException("Failed to send the errant record to Kafka",
+                exception.getCause());
+        }
+    };
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function;

Review comment:
       Why use a function here? We can use a simple variable here.
   
   (I suggested a function offline to avoid having to pass in the converters. But passing in the converters into this class encapsulates this logic nicely.)

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -119,38 +117,46 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
      * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
      */
     public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+        Callback callback = (metadata, exception) -> {
+            if (exception != null) {
+                log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception);
+                errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            }
+        };
+        report(context, callback);
+    }
+
+    /**
+     * Write the raw records into a Kafka topic. This methods allows for a custom callback to be
+     * passed to the producer.
+     *
+     * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+     * @param callback callback to be invoked by the producer when the record is sent to Kafka.
+     * @return

Review comment:
       Missing JavaDoc details:
   ```suggestion
        * @return the future associated with the writing of this record; never null
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -100,4 +110,29 @@ public String toString() {
                 ", timestampType=" + timestampType +
                 "} " + super.toString();
     }
+
+    public class InternalSinkRecord extends SinkRecord {
+
+        ConsumerRecord<byte[], byte[]> originalRecord;
+
+        public InternalSinkRecord(String topic, int partition, Schema keySchema, Object key,
+                                  Schema valueSchema, Object value, long kafkaOffset,
+                                  Long timestamp, TimestampType timestampType,
+                                  Iterable<Header> headers,
+                                  ConsumerRecord<byte[], byte[]> originalRecord) {
+            super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp,
+                timestampType, headers);
+            this.originalRecord = originalRecord;
+
+        }
+
+        /**
+         *
+         * @return the original consumer record that was converted to this sink record.
+         */

Review comment:
       If we're going to add JavaDoc, which I think is helpful, then make it complete by adding a description:
   ```suggestion
           /**
            * Return the original consumer record that this sink record represents.
            *
            * @return the original consumer record; never null
            */
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -552,14 +553,18 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
             TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig,
+                errorHandlingMetrics, connectorClass));

Review comment:
       I don't think this line was actually changed other than formatting. Please remove it to avoid changing lines we don't have to.

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
##########
@@ -95,4 +95,30 @@
      */
     void requestCommit();
 
+    /**
+     * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord records}
+     * passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record,
+     * the sink task will receive a {@link Future} that the task can optionally use to wait until

Review comment:
       This is invalid in JavaDoc:
   ```suggestion
        * the sink task will receive a {@link java.util.concurrent.Future} that the task can optionally use to wait until
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +506,18 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+
+        InternalSinkRecord internalSinkRecord = origRecord.newRecord(origRecord.topic(),
+            origRecord.kafkaPartition(), origRecord.keySchema(), origRecord.key(),
+            origRecord.valueSchema(), origRecord.value(), origRecord.kafkaOffset(),
+            origRecord.timestamp(), origRecord.timestampType(), origRecord.headers(), msg);
+
         log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
                 this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
         if (isTopicTrackingEnabled) {
-            recordActiveTopic(origRecord.topic());
+            recordActiveTopic(internalSinkRecord.topic());

Review comment:
       This line would not need to be affected.
   ```suggestion
               recordActiveTopic(sinkRecord.topic());
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -67,30 +72,22 @@
     private final SinkConnectorConfig connConfig;
     private final ConnectorTaskId connectorTaskId;
     private final ErrorHandlingMetrics errorHandlingMetrics;
+    private final String dlqTopicName;
 
     private KafkaProducer<byte[], byte[]> kafkaProducer;
 
     public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminProps,
                                                          ConnectorTaskId id,
                                                          SinkConnectorConfig sinkConfig, Map<String, Object> producerProps,
                                                          ErrorHandlingMetrics errorHandlingMetrics) {
-        String topic = sinkConfig.dlqTopicName();
 
-        try (Admin admin = Admin.create(adminProps)) {
-            if (!admin.listTopics().names().get().contains(topic)) {
-                log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
-                NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
-                admin.createTopics(singleton(schemaTopicRequest)).all().get();
-            }
-        } catch (InterruptedException e) {
-            throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e);
-        } catch (ExecutionException e) {
-            if (!(e.getCause() instanceof TopicExistsException)) {
-                throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e);
-            }
-        }
 
-        KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
+        KafkaProducer<byte[], byte[]> dlqProducer = setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );

Review comment:
       We don't need to make this change, do we? Let's try to minimize the changes to the existing code.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -83,6 +87,17 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi
         this.time = time;
     }
 
+    public Future<Void> executeFailed(Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function,
+                                      Stage stage, Class<?> executingClass, SinkRecord record,
+                                      Throwable error, Callback callback) {
+

Review comment:
       No new line is needed here.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -119,38 +117,46 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
      * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
      */
     public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+        Callback callback = (metadata, exception) -> {
+            if (exception != null) {
+                log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception);
+                errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            }
+        };
+        report(context, callback);
+    }
+
+    /**
+     * Write the raw records into a Kafka topic. This methods allows for a custom callback to be
+     * passed to the producer.
+     *
+     * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+     * @param callback callback to be invoked by the producer when the record is sent to Kafka.
+     * @return
+     */
+    public Future<RecordMetadata> report(ProcessingContext context, Callback callback) {
         if (dlqTopicName.isEmpty()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
-
         errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
         ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord();
         if (originalMessage == null) {
             errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
-        ProducerRecord<byte[], byte[]> producerRecord;
-        if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
-            producerRecord = new ProducerRecord<>(dlqTopicName, null,
-                    originalMessage.key(), originalMessage.value(), originalMessage.headers());
-        } else {
-            producerRecord = new ProducerRecord<>(dlqTopicName, null, originalMessage.timestamp(),
-                    originalMessage.key(), originalMessage.value(), originalMessage.headers());
-        }
+        ProducerRecord<byte[], byte[]> producerRecord =
+            new ProducerRecord<>(dlqTopicName, null,
+                originalMessage.timestamp() != RecordBatch.NO_TIMESTAMP ?
+                    originalMessage.timestamp() : null,
+                originalMessage.key(), originalMessage.value(), originalMessage.headers());

Review comment:
       It'd be better to not change these lines, because we don't intend to change the logic -- yet doing so adds risk and increases the size of this PR.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -119,38 +117,46 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
      * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
      */
     public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+        Callback callback = (metadata, exception) -> {
+            if (exception != null) {
+                log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception);
+                errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            }
+        };
+        report(context, callback);
+    }
+
+    /**
+     * Write the raw records into a Kafka topic. This methods allows for a custom callback to be
+     * passed to the producer.
+     *
+     * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+     * @param callback callback to be invoked by the producer when the record is sent to Kafka.
+     * @return

Review comment:
       Why do we need to overload this method to pass in a callback? The only place we're using this new method is via the reporter, and `WorkerErrantRecordReporter.callback` doesn't seem to provide any value and in fact is not able to call `errorHandlingMetrics.recordDeadLetterQueueProduceFailed()` like this class.
   
   Wouldn't it be much simpler to just add a return type to the existing method?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
##########
@@ -28,6 +34,18 @@
      */
     void report(ProcessingContext context);
 
+    /**
+     * Report an error with a specified callback.
+     *
+     * @param context the processing context (cannot be null).
+     * @param callback callback to be invoked by a producer when sending a record to Kafka.
+     * @return future result from the producer sending a record to Kafka
+     */
+    default Future<RecordMetadata> report(ProcessingContext context, Callback callback) {
+        report(context);
+        return CompletableFuture.completedFuture(null);
+    }
+

Review comment:
       I don't think we need to overload this method, and instead we can just change the return type. After all, the `ErrorReporter` is not part of the public API, and is merely an abstraction we use within the runtime itself.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final Callback callback = (metadata, exception) -> {
+        if (exception != null) {
+            throw new ConnectException("Failed to send the errant record to Kafka",
+                exception.getCause());
+        }
+    };
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function;
+
+        if (record instanceof InternalSinkRecord) {
+            function = sinkRecord -> ((InternalSinkRecord) sinkRecord).originalRecord();
+        } else {
+            function = sinkRecord -> {
+
+                String topic = record.topic();
+                byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+                byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(),
+                    record.value());
+
+                RecordHeaders headers = new RecordHeaders();
+                if (record.headers() != null) {
+                    for (Header header : record.headers()) {
+                        String headerKey = header.key();
+                        byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                            header.schema(), header.value());
+                        headers.add(headerKey, rawHeader);
+                    }
+                }
+
+                return new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                    record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, -1,
+                    -1, key, value, headers);
+
+            };
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(function, Stage.TASK_PUT,
+            SinkTask.class, record, error, callback);
+
+        futures.add(future);

Review comment:
       How about adding it only if the future is not done already?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final Callback callback = (metadata, exception) -> {
+        if (exception != null) {
+            throw new ConnectException("Failed to send the errant record to Kafka",
+                exception.getCause());
+        }
+    };
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function;
+
+        if (record instanceof InternalSinkRecord) {
+            function = sinkRecord -> ((InternalSinkRecord) sinkRecord).originalRecord();
+        } else {
+            function = sinkRecord -> {
+
+                String topic = record.topic();
+                byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+                byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(),
+                    record.value());
+
+                RecordHeaders headers = new RecordHeaders();
+                if (record.headers() != null) {
+                    for (Header header : record.headers()) {
+                        String headerKey = header.key();
+                        byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                            header.schema(), header.value());
+                        headers.add(headerKey, rawHeader);
+                    }
+                }
+
+                return new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                    record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, -1,
+                    -1, key, value, headers);
+
+            };
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(function, Stage.TASK_PUT,
+            SinkTask.class, record, error, callback);
+
+        futures.add(future);
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void getAllFutures() {
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while calling ");
+                throw new ConnectException(e);
+            }
+        }
+        futures.clear();
+    }
+
+    /**
+     * Wrapper class to aggregate producer futures and abstract away the record metadata from the
+     * Connect user.
+     */
+    public static class ErrantRecordFuture implements Future<Void> {
+
+        private final List<Future<RecordMetadata>> futures;
+
+
+        public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) {
+            futures = producerFutures;
+        }

Review comment:
       Rather than have a list of futures, why not have a single `Future` delegate that is either a `CompletableFuture.allOf(...)` or a single feature? This makes the constructor a little more complex, but it would simplify all of the other methods tremendously since they merely have to delegate (except for `cancel()` and `isCancelled()`, which can stay the same:
   ```suggestion
           public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) {
               if (producerFutures == null || producerFutures.isEmpty()) {
                   future = CompletableFuture.completedFuture(null);
               } else {
                   futures = CompletableFutures.allOf(producerFutures);
               }
           }
   ```
   This will make `get(long, TimeUnit)` behave more correctly by requiring that all futures complete within the stated time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430866143



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
##########
@@ -214,6 +218,72 @@ public void testSourceConnector() throws Exception {
         connect.deleteConnector(CONNECTOR_NAME);
     }
 
+    @Test
+    public void testErrantRecordReporter() throws Exception {
+        connect.kafka().createTopic(DLQ_TOPIC, 1);
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME);
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
+
+        // validate the intended connector configuration, a config that errors
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 1,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // add missing configuration to make the config valid
+        props.put("name", CONNECTOR_NAME);
+
+        // validate the intended connector configuration, a valid config
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 0,
+            "Validating connector configuration produced an unexpected number or errors.");

Review comment:
       I don't think it hurts to add this step.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431512019



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }
+
+    protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
+                              int partition, Schema keySchema, Object key, Schema valueSchema,
+                              Object value, long kafkaOffset, Long timestamp,
+                              TimestampType timestampType, Iterable<Header> headers) {

Review comment:
       nit: indentation is a bit off here. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {

Review comment:
       A class javadoc would be helpful, in order to understand why this is introduced. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -553,13 +554,16 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter
+                = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,

Review comment:
       nit: I think we always keep `=` with the left operand. 

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * Component that the sink task can use as it {@link SinkTask#put(java.util.Collection)}.

Review comment:
       typo. Not sure how you want to say it. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -111,41 +115,41 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
         this.connConfig = connConfig;
         this.connectorTaskId = id;
         this.errorHandlingMetrics = errorHandlingMetrics;
+        this.dlqTopicName = connConfig.dlqTopicName().trim();
     }
 
     /**
-     * Write the raw records into a Kafka topic.
+     * Write the raw records into a Kafka topic and return the producer future.
      *
      * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+     * @return the future associated with the writing of this record; never null
      */
-    public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+    public Future<RecordMetadata> report(ProcessingContext context) {
         if (dlqTopicName.isEmpty()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
-
         errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
         ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord();
         if (originalMessage == null) {
             errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         ProducerRecord<byte[], byte[]> producerRecord;
         if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
             producerRecord = new ProducerRecord<>(dlqTopicName, null,
-                    originalMessage.key(), originalMessage.value(), originalMessage.headers());

Review comment:
       nit: it was correct before

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ErrantRecordSinkConnector extends MonitorableSinkConnector {
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ErrantRecordSinkTask.class;
+    }
+
+    public static class ErrantRecordSinkTask extends MonitorableSinkTask {
+        private ErrantRecordReporter reporter;
+
+        public ErrantRecordSinkTask() {
+            super();
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            super.start(props);
+            reporter = context.errantRecordReporter();
+        }
+
+        @Override
+        public void put(Collection<SinkRecord> records) {
+

Review comment:
       nit: extra blank line
   ```suggestion
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
##########
@@ -219,6 +223,72 @@ public void testSourceConnector() throws Exception {
         connect.deleteConnector(CONNECTOR_NAME);
     }
 
+    @Test
+    public void testErrantRecordReporter() throws Exception {
+        connect.kafka().createTopic(DLQ_TOPIC, 1);
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME);
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
+
+        // validate the intended connector configuration, a config that errors
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 1,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // add missing configuration to make the config valid
+        props.put("name", CONNECTOR_NAME);
+
+        // validate the intended connector configuration, a valid config
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 0,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // start a sink connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(this::checkForPartitionAssignment,
+            CONNECTOR_SETUP_DURATION_MS,
+            "Connector tasks were not assigned a partition each.");
+
+        // produce some messages into source topic partitions
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            connect.kafka().produce("test-topic", i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i);
+        }
+
+        // consume all records from the source topic or fail, to ensure that they were correctly produced.
+        assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED,
+            connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count());
+
+        // wait for the connector tasks to consume all records.
+        connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
+
+        // wait for the connector tasks to commit all records.
+        connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
+
+        // consume all records from the dlq topic or fail, to ensure that they were correctly produced
+        int recordNum = connect.kafka().consume(
+            NUM_RECORDS_PRODUCED,
+            RECORD_TRANSFER_DURATION_MS,
+            DLQ_TOPIC
+        ).count();
+
+        // delete connector
+        connect.deleteConnector(CONNECTOR_NAME);
+

Review comment:
       ```suggestion
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -518,6 +535,10 @@ private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
         return result;
     }
 
+    WorkerErrantRecordReporter workerErrantRecordReporter() {

Review comment:
       let's add `protected` here to be symmetric to what other fields that are accessed by the context have as scope 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -94,6 +95,7 @@
     private int commitFailures;
     private boolean pausedForRedelivery;
     private boolean committing;
+    private WorkerErrantRecordReporter workerErrantRecordReporter;

Review comment:
       should be `final` right?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -111,41 +115,41 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
         this.connConfig = connConfig;
         this.connectorTaskId = id;
         this.errorHandlingMetrics = errorHandlingMetrics;
+        this.dlqTopicName = connConfig.dlqTopicName().trim();
     }
 
     /**
-     * Write the raw records into a Kafka topic.
+     * Write the raw records into a Kafka topic and return the producer future.
      *
      * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+     * @return the future associated with the writing of this record; never null
      */
-    public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+    public Future<RecordMetadata> report(ProcessingContext context) {
         if (dlqTopicName.isEmpty()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
-
         errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
         ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord();
         if (originalMessage == null) {
             errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         ProducerRecord<byte[], byte[]> producerRecord;
         if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
             producerRecord = new ProducerRecord<>(dlqTopicName, null,
-                    originalMessage.key(), originalMessage.value(), originalMessage.headers());
+                originalMessage.key(), originalMessage.value(), originalMessage.headers());
         } else {
             producerRecord = new ProducerRecord<>(dlqTopicName, null, originalMessage.timestamp(),
-                    originalMessage.key(), originalMessage.value(), originalMessage.headers());

Review comment:
       nit: it was correct before

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }
+
+    protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
+                              int partition, Schema keySchema, Object key, Schema valueSchema,
+                              Object value, long kafkaOffset, Long timestamp,
+                              TimestampType timestampType, Iterable<Header> headers) {
+        super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
+        this.originalRecord = originalRecord;
+    }
+
+    @Override
+    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key,
+                                Schema valueSchema, Object value, Long timestamp,
+                                Iterable<Header> headers) {
+        return new InternalSinkRecord(originalRecord, topic, kafkaPartition, keySchema, key,
+            valueSchema, value, kafkaOffset(), timestamp, timestampType(), headers());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return super.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
+    }

Review comment:
       these overrides don't seem to add much. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -695,6 +705,32 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return reporters;
     }
 
+    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+        ConnectorTaskId id,
+        SinkConnectorConfig connConfig,
+        Class<? extends Connector> connectorClass,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        // check if errant record reporter topic is configured
+        String topic = connConfig.dlqTopicName();
+        if ((topic != null && !topic.isEmpty()) || connConfig.enableErrorLog()) {

Review comment:
       Also, topic can never be `null` if it's coming from a parsed config value that doesn't have `null` as its default value. (another way to think of that is that you can't pass a `null` value from properties)

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;

Review comment:
       ```suggestion
       protected final List<Future<Void>> futures;
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        // Most of the records will be an internal sink record, but the task could potentially
+        // report modified or new records, so handle both cases
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            // Generate a new consumer record from the modified sink record. We prefer
+            // to send the original consumer record (pre-transformed) to the DLQ,
+            // but in this case we don't have one and send the potentially transformed
+            // record instead
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic,
+                record.valueSchema(), record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            int keyLength = key != null ? key.length : -1;
+            int valLength = value != null ? value.length : -1;
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
+                valLength, key, value, headers);
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void awaitAllFutures() {
+        Future<?> future = null;
+        while ((future = futures.poll()) != null) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while awaiting an errant record future's completion.");
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    /**
+     * Wrapper class to aggregate producer futures and abstract away the record metadata from the
+     * Connect user.
+     */
+    public static class ErrantRecordFuture implements Future<Void> {
+
+        private final List<Future<RecordMetadata>> futures;
+
+        public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) {
+            futures = producerFutures;
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException("Reporting an errant record cannot be cancelled.");
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            for (Future<RecordMetadata> future: futures) {
+                if (!future.isDone()) {
+                    return false;
+                }
+            }
+            return true;

Review comment:
       ```suggestion
               return futures.stream().allMatch(Future::isDone);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
##########
@@ -132,11 +138,25 @@ public void currentContext(Stage stage, Class<?> klass) {
 
     /**
      * Report errors. Should be called only if an error was encountered while executing the operation.
+     *
+     * @return a errant record future that potentially aggregates the producer futures
      */
-    public void report() {
+    public Future<Void> report() {
+        if (reporters.size() == 1) {
+            return new ErrantRecordFuture(Collections.singletonList(reporters.iterator().next().report(this)));
+        }
+
+        List<Future<RecordMetadata>> futures = new LinkedList<>();

Review comment:
       Suggestion (can't add because of the deleted line): 
   ```
           List<Future<RecordMetadata>> futures = reporters.stream()
                   .map(r -> r.report(this))
                   .filter(Future::isDone)
                   .collect(Collectors.toCollection(LinkedList::new));
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        // Most of the records will be an internal sink record, but the task could potentially
+        // report modified or new records, so handle both cases
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            // Generate a new consumer record from the modified sink record. We prefer
+            // to send the original consumer record (pre-transformed) to the DLQ,
+            // but in this case we don't have one and send the potentially transformed
+            // record instead
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic,
+                record.valueSchema(), record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            int keyLength = key != null ? key.length : -1;
+            int valLength = value != null ? value.length : -1;
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
+                valLength, key, value, headers);
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void awaitAllFutures() {
+        Future<?> future = null;

Review comment:
       nit: initialization is not required




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-635071561


   Three green builds of commit `126b04`:
   * https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2552/
   * https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/710/
   * https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6571/
   
   @aakashnshah, let's get fixes for @kkonstantine's suggestions. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430761599



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {

Review comment:
       Does this have an unbounded waiting time? How does this interact with `task.shutdown.graceful.timeout.ms`? What is the delivery guarantee of these error reports?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ErrantRecordSinkConnector extends MonitorableSinkConnector {
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ErrantRecordSinkTask.class;
+    }
+
+    public static class ErrantRecordSinkTask extends MonitorableSinkTask {
+        private ErrantRecordReporter reporter;
+
+        public ErrantRecordSinkTask() {
+            super();
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            super.start(props);
+            try {
+                reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
+            } catch (NoClassDefFoundError e) {
+                // Will occur in Connect runtimes earlier than 2.6

Review comment:
       Does this test ever encounter this exception? I don't think we will be able to backport this test to < 2.6 because the method won't exist at all, much less generate the exception that is being caught here.
   If anything, this generates a less informative NPE later in `put`, and hides the actual root cause.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634916959


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634721500


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-635139703


   2/3 builds are green. Merging now to allow for other PRs to resolve conflicts sooner rather than later, if needed. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634955248


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-635085234


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] levzem commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
levzem commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r429689645



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).

Review comment:
       this is separate from the Connect DLQ - maybe we should avoid mentioning it in the docs to make sure its a different concept? 
   
   ```suggestion
      * Report a problematic record and the corresponding error to be written to the sink
      * connector's error topic.
   ```
   
   or something similar

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+

Review comment:
       nit 2x

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+   * record has been written or throw any exception that occurred while sending the record.
+   * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+   * immediately.
+   *
+   * @param record the problematic record; may not be null
+   * @param error  the error capturing the problem with the record; may not be null
+   * @return a future that can be used to block until the record and error are reported
+   *         to the DLQ

Review comment:
       same as above

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );

Review comment:
       ```suggestion
                   log.error(
                       "Error processing record in topic {} partition {} at offset {}", 
                        record.topic(),
                        record.partition(),
                        record.kafkaOffset(),
                       error
                   );
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    // Visible for testing
+    public class ErrantRecordFuture implements Future<Void> {
+
+        Future<RecordMetadata> future;
+
+        public ErrantRecordFuture(Future<RecordMetadata> producerFuture) {
+            future = producerFuture;
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException("Reporting an errant record cannot be cancelled.");
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            return future == null || future.isDone();
+        }
+
+        public Void get() throws InterruptedException, ExecutionException {
+            if (future != null) {

Review comment:
       any reason for this to ever be `null`?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporterTest.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerErrantRecordReporterTest {
+
+    private WorkerErrantRecordReporter reporter;
+
+    private KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+    private SinkConnectorConfig sinkConnectorConfig = mock(SinkConnectorConfig.class);
+    private Converter converter = mock(Converter.class);
+    private HeaderConverter headerConverter = mock(HeaderConverter.class);
+    private SinkRecord record = mock(SinkRecord.class);
+
+    @Before
+    public void setup() {
+      reporter = new WorkerErrantRecordReporter(
+            producer,
+            sinkConnectorConfig,
+            converter,
+            converter,
+            headerConverter
+        );
+    }
+
+    @Test
+    public void testReport() {
+      when(sinkConnectorConfig.dlqTopicName()).thenReturn("dlq-topic");
+      when(sinkConnectorConfig.enableErrorLog()).thenReturn(false);
+      reporter.report(record, new Throwable());

Review comment:
       maybe make this into a mock as well and at least assert on some method calls using `verify()`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+

Review comment:
       2x

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);

Review comment:
       ```suggestion
                   log.error("Error processing record: {}", record.toString(), error);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -695,6 +705,32 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return reporters;
     }
 
+    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+        ConnectorTaskId id,
+        SinkConnectorConfig connConfig,
+        Class<? extends Connector> connectorClass,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        // check if errant record reporter topic is configured
+        String topic = connConfig.dlqTopicName();
+        if ((topic != null && !topic.isEmpty()) || connConfig.enableErrorLog()) {

Review comment:
       would be nice if this was a method in the config

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();

Review comment:
       should you remove the future from the list when you successfully `get()`?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
##########
@@ -214,6 +218,72 @@ public void testSourceConnector() throws Exception {
         connect.deleteConnector(CONNECTOR_NAME);
     }
 
+    @Test
+    public void testErrantRecordReporter() throws Exception {
+        connect.kafka().createTopic(DLQ_TOPIC, 1);
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME);
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, "test-topic");

Review comment:
       extract to variable

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
##########
@@ -214,6 +218,72 @@ public void testSourceConnector() throws Exception {
         connect.deleteConnector(CONNECTOR_NAME);
     }
 
+    @Test
+    public void testErrantRecordReporter() throws Exception {
+        connect.kafka().createTopic(DLQ_TOPIC, 1);
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME);
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
+
+        // validate the intended connector configuration, a config that errors
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 1,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // add missing configuration to make the config valid
+        props.put("name", CONNECTOR_NAME);
+
+        // validate the intended connector configuration, a valid config
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 0,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // start a sink connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(this::checkForPartitionAssignment,
+            CONNECTOR_SETUP_DURATION_MS,
+            "Connector tasks were not assigned a partition each.");

Review comment:
       ```suggestion
           waitForCondition(
               this::checkForPartitionAssignment,
               CONNECTOR_SETUP_DURATION_MS,
               "Connector tasks were not assigned a partition each."
            );
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
##########
@@ -214,6 +218,72 @@ public void testSourceConnector() throws Exception {
         connect.deleteConnector(CONNECTOR_NAME);
     }
 
+    @Test
+    public void testErrantRecordReporter() throws Exception {
+        connect.kafka().createTopic(DLQ_TOPIC, 1);
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME);
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
+
+        // validate the intended connector configuration, a config that errors
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 1,
+            "Validating connector configuration produced an unexpected number or errors.");
+
+        // add missing configuration to make the config valid
+        props.put("name", CONNECTOR_NAME);
+
+        // validate the intended connector configuration, a valid config
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 0,
+            "Validating connector configuration produced an unexpected number or errors.");

Review comment:
       do we need this invalid config step here

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporterTest.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerErrantRecordReporterTest {
+
+    private WorkerErrantRecordReporter reporter;
+

Review comment:
       ```suggestion
   ```
   
   nit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430866479



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporterTest.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerErrantRecordReporterTest {
+
+    private WorkerErrantRecordReporter reporter;
+
+    private KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+    private SinkConnectorConfig sinkConnectorConfig = mock(SinkConnectorConfig.class);
+    private Converter converter = mock(Converter.class);
+    private HeaderConverter headerConverter = mock(HeaderConverter.class);
+    private SinkRecord record = mock(SinkRecord.class);
+
+    @Before
+    public void setup() {
+      reporter = new WorkerErrantRecordReporter(
+            producer,
+            sinkConnectorConfig,
+            converter,
+            converter,
+            headerConverter
+        );
+    }
+
+    @Test
+    public void testReport() {
+      when(sinkConnectorConfig.dlqTopicName()).thenReturn("dlq-topic");
+      when(sinkConnectorConfig.enableErrorLog()).thenReturn(false);
+      reporter.report(record, new Throwable());

Review comment:
       I've changed the test. Please check it out again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634957589


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634953467


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430863006



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+   * record has been written or throw any exception that occurred while sending the record.
+   * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+   * immediately.
+   *
+   * @param record the problematic record; may not be null
+   * @param error  the error capturing the problem with the record; may not be null
+   * @return a future that can be used to block until the record and error are reported
+   *         to the DLQ

Review comment:
       Yeah, a good idea to add the `@throws` annotation. Changing the description as it will throw a `ConnectException` if there is an error reporting the record no matter what `error.tolerance` is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r429975231



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+   * record has been written or throw any exception that occurred while sending the record.
+   * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+   * immediately.
+   *
+   * @param record the problematic record; may not be null
+   * @param error  the error capturing the problem with the record; may not be null
+   * @return a future that can be used to block until the record and error are reported
+   *         to the DLQ

Review comment:
       Also, what exceptions can this method throw? Should we add something like:
   
   ```suggestion
      *         to the DLQ
      * @throws ConnectException if the error reporter and DLQ fail to write a 
      *         reported record and are configured with {@code error.tolerance=NONE}
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+   * record has been written or throw any exception that occurred while sending the record.
+   * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+   * immediately.
+   *

Review comment:
       ```suggestion
      * 
      * Connect guarantees that sink records reported through this reporter will be written to the error topic
      * before the framework calls the {@link SinkTask#preCommit(Map)} method and therefore before
      * committing the consumer offsets. SinkTask implementations can use the Future when stronger guarantees
      * are required.
      * 
   ```

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {

Review comment:
       Please add JavaDoc for the interface, with `@since 2.6.0`.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(

Review comment:
       Nit: static methods should appear before the non-static fields.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    // Visible for testing
+    public class ErrantRecordFuture implements Future<Void> {
+
+        Future<RecordMetadata> future;

Review comment:
       Couldn't this be final?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -680,6 +689,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
                                                                 connectorClientConfigOverridePolicy);
             Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
             DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
+

Review comment:
       Nit: let's avoid adding new lines in code otherwise unaffected in the PR.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -531,13 +531,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter =
+                createWorkerErrantRecordReporter(
+                    id,
+                    sinkConfig,
+                    connectorClass,
+                    keyConverter,
+                    valueConverter,
+                    headerConverter
+                );

Review comment:
       Nit formatting:
   ```suggestion
               WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(
                       id, sinkConfig, connectorClass, keyConverter, valueConverter, headerConverter);
   ```
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -695,6 +705,32 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return reporters;
     }
 
+    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+        ConnectorTaskId id,
+        SinkConnectorConfig connConfig,
+        Class<? extends Connector> connectorClass,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        // check if errant record reporter topic is configured
+        String topic = connConfig.dlqTopicName();
+        if ((topic != null && !topic.isEmpty()) || connConfig.enableErrorLog()) {
+            Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
+                connectorClientConfigOverridePolicy);
+            Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            return WorkerErrantRecordReporter.createAndSetup(
+                adminProps,
+                producerProps,
+                connConfig,
+                keyConverter,
+                valueConverter,
+                headerConverter
+            );

Review comment:
       Nit formatting:
   ```suggestion
               return WorkerErrantRecordReporter.createAndSetup(adminProps, producerProps,
                   connConfig, keyConverter, valueConverter, headerConverter);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;

Review comment:
       All fields that can be `final` should be marked as such. This provides semantic intent to future developers and helps prevent unintentionally changing the fields in the future.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -531,13 +531,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter =
+                createWorkerErrantRecordReporter(
+                    id,
+                    sinkConfig,
+                    connectorClass,
+                    keyConverter,
+                    valueConverter,
+                    headerConverter
+                );

Review comment:
       At a higher level, why are we not reusing the `RetryWithToleranceOperator` here? I thought that was kind of the intent of the KIP, that this new `report(...)` method is just more way to capture problematic records using the existing DLQ functionality. I understand that might require other refactoring of that class (like returning a Future from the produce-like methods), but it seems like it would simplify things substantially by avoiding having to create our own producer and reuse a lot more of the functionality, such as metrics, retry count, logging, using the same producers, etc.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);

Review comment:
       Do we really want to pass the ExecutionException to the ConnectException, or would it be better to pass that exception's *cause* to the ConnectException?
   
   How about log messages here?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);

Review comment:
       I suggested earlier about reusing the `RetryWithToleranceOperator`, and that doing so might require adding a `produce`-like method to that class that simply reports a new error. If that method took a `Callback` here and passed it to its `producer.send(...)` call, then we could provide a callback that removed the (completed) future from our list, helping to keep that list as small as possible with only the incomplete futures.
   
   If we did that, we'd want to use a `LinkedList` rather than an `ArrayList`, since we're no longer removing futures only from the ends of the list.
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    // Visible for testing
+    public class ErrantRecordFuture implements Future<Void> {
+
+        Future<RecordMetadata> future;
+
+        public ErrantRecordFuture(Future<RecordMetadata> producerFuture) {
+            future = producerFuture;

Review comment:
       If we ensure that the producer future is never null, then we can remove the `if (future == null)` kind of checks in this class' methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634908415






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431557717



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }
+
+    protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
+                              int partition, Schema keySchema, Object key, Schema valueSchema,
+                              Object value, long kafkaOffset, Long timestamp,
+                              TimestampType timestampType, Iterable<Header> headers) {
+        super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
+        this.originalRecord = originalRecord;
+    }
+
+    @Override
+    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key,
+                                Schema valueSchema, Object value, Long timestamp,
+                                Iterable<Header> headers) {
+        return new InternalSinkRecord(originalRecord, topic, kafkaPartition, keySchema, key,
+            valueSchema, value, kafkaOffset(), timestamp, timestampType(), headers());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return super.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
+    }

Review comment:
       never mind then. I'll leave this to AI. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634957054


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634956576


   Closing to try to fix builds; will reopen shortly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634970551


   retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-635086497


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine merged pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
kkonstantine merged pull request #8720:
URL: https://github.com/apache/kafka/pull/8720


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634910565


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431564671



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;

Review comment:
       Unfortunately, this doesn't let for `poll()` to be recognized.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430864187



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);

Review comment:
       Thanks for the suggestion. I've removed this part of the code completely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431395579



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
##########
@@ -16,17 +16,25 @@
  */
 package org.apache.kafka.connect.runtime.errors;
 
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
 /**
  * Report an error using the information contained in the {@link ProcessingContext}.
  */
 public interface ErrorReporter extends AutoCloseable {
 
     /**
-     * Report an error.
+     * Report an error and return the producer future.
      *
      * @param context the processing context (cannot be null).
+     * @return future result from the producer sending a record to Kafka.
      */
-    void report(ProcessingContext context);
+    default Future<RecordMetadata> report(ProcessingContext context) {
+        return CompletableFuture.completedFuture(null);
+    }

Review comment:
       We should not make this method a default method, since both implementations of the interface define this method.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
##########
@@ -50,17 +53,18 @@ public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig, ErrorHandling
      * @param context the processing context.
      */
     @Override
-    public void report(ProcessingContext context) {
+    public Future<RecordMetadata> report(ProcessingContext context) {
         if (!connConfig.enableErrorLog()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         if (!context.failed()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         log.error(message(context), context.error());
         errorHandlingMetrics.recordErrorLogged();
+        return CompletableFuture.completedFuture(null);

Review comment:
       What about creating:
   ```
       private static final Future<RecordMetadata> COMPLETED = CompletableFuture.completedFuture(null);
   ```
   
   and then returning that instance in all of these places. Since it's already completed, immutable, and we don't allow cancellation, it should be fine to reuse in this `LogReporter`.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        // Most of the records will be an internal sink record, but the task could potentially
+        // report modified or new records, so handle both cases
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            String topic = record.topic();

Review comment:
       How about clarifying this a bit:
   ```suggestion
               // Generate a new consumer record from the modified sink record. We prefer
               // to send the original consumer record (pre-transformed) to the DLQ, 
               // but in this case we don't have one and send the potentially transformed
               // record instead
               String topic = record.topic();
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        // Most of the records will be an internal sink record, but the task could potentially
+        // report modified or new records, so handle both cases
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic,
+                record.valueSchema(), record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, -1,
+                -1, key, value, headers);

Review comment:
       We should use the length of the key and value in the record:
   ```suggestion
               int keyLength = key != null ? key.length : -1;
               int valLength = value != null ? value.length : -1;
               consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
                   record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
                   valLength, key, value, headers);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -360,6 +364,10 @@ private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean cl
     }
 
     private void commitOffsets(long now, boolean closing) {
+        if (workerErrantRecordReporter != null) {
+            workerErrantRecordReporter.awaitAllFutures();

Review comment:
       Once again, please add trace log messages before an after this line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431357309



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.sink;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;

Review comment:
       This leftover line should be removed.
   ```suggestion
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }

Review comment:
       We need to override the `newRecord(...)` that has all the parameters:
   ```suggestion
       public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
           this(originalRecord, record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
                   record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
                   record.timestampType(), record.headers());
       }
   
       public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord,
           String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
           Long timestamp, TimestampType timestampType, Iterable<Header> headers
       ) {
           super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
           this.originalRecord = originalRecord;
       }
   
       @Override
       public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
               Long timestamp, Iterable<Header> headers) {
           return new InternalSinkRecord(originalRecord, topic, kafkaPartition, keySchema, key,
                   valueSchema, value, kafkaOffset(), timestamp, timestampType(), headers());
       }
   
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
##########
@@ -28,6 +33,17 @@
      */
     void report(ProcessingContext context);
 
+    /**
+     * Report an error and return the producer future.
+     *
+     * @param context the processing context (cannot be null).
+     * @return future result from the producer sending a record to Kafka
+     */
+    default Future<RecordMetadata> reportAndReturnFuture(ProcessingContext context) {
+        report(context);
+        return CompletableFuture.completedFuture(null);
+    }
+

Review comment:
       This is an internal API, so why can we not just change the existing `report(...)` method to return `Future<?>`?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(),
+                record.value());

Review comment:
       ```suggestion
               byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(), record.value());
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(),
+                record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, -1,
+                -1, key, value, headers);
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void getAllFutures() {
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while calling ");
+                throw new ConnectException(e);
+            }
+        }
+        futures.clear();

Review comment:
       Let's use the queue-style access, since it saves us from having to clear the list and would work if we need it to be concurrent.
   ```suggestion
           Future<?> future = null;
           while ((future = futures.poll()) != null) {
               try {
                   future.get();
               } catch (InterruptedException | ExecutionException e) {
                   log.error("Encountered an error while calling ");
                   throw new ConnectException(e);
               }
           }
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +505,21 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+
         log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
                 this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
         if (isTopicTrackingEnabled) {
             recordActiveTopic(origRecord.topic());
         }
-        return transformationChain.apply(origRecord);
+
+        // Apply the transformations
+        SinkRecord transformedRecord = transformationChain.apply(origRecord);
+

Review comment:
       Nit: let's remove this blank line, since there already are quite a few.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -360,6 +364,10 @@ private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean cl
     }
 
     private void commitOffsets(long now, boolean closing) {
+        if (workerErrantRecordReporter != null) {
+            workerErrantRecordReporter.getAllFutures();

Review comment:
       Let's add a trace log message before and after this call.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -21,11 +21,13 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+

Review comment:
       Nit: let's remove this blank line, since it's unrelated to other changes.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -111,6 +116,7 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
         this.connConfig = connConfig;
         this.connectorTaskId = id;
         this.errorHandlingMetrics = errorHandlingMetrics;
+        this.dlqTopicName = connConfig.dlqTopicName();

Review comment:
       Should we trim this?
   ```suggestion
           this.dlqTopicName = connConfig.dlqTopicName().trim();
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        if (record instanceof InternalSinkRecord) {

Review comment:
       ```suggestion
           // Most of the records will be an internal sink record, but the task could potentially
           // report modified or new records, so handle both cases
           if (record instanceof InternalSinkRecord) {
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(),
+                record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, -1,
+                -1, key, value, headers);
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void getAllFutures() {

Review comment:
       Let's rename this to `awaitAllFutures()` since this really is not a getter method.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +505,21 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+

Review comment:
       Nit: let's remove this blank line, since it's unrelated to other changes.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;

Review comment:
       This can be package protected and final:
   ```suggestion
       final LinkedList<Future<Void>> futures;
   ```
   

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ErrantRecordSinkConnector extends MonitorableSinkConnector {
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ErrantRecordSinkTask.class;
+    }
+
+    public static class ErrantRecordSinkTask extends MonitorableSinkTask {
+        private ErrantRecordReporter reporter;
+
+        public ErrantRecordSinkTask() {
+            super();
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            super.start(props);
+            try {
+                reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
+            } catch (NoClassDefFoundError e) {
+                // Will occur in Connect runtimes earlier than 2.6

Review comment:
       @aakashnshah let's remove this comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-635013768


   Not sure what's happening, but here are some finally-running builds from previous commit (0e40):
   * https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2548/
   * https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6569/
   * https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/708/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430831919



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {

Review comment:
       Yes, the waiting time is unbounded. The delivery guarantee is that all errant records up to the latest offset in `preCommit()` will be sent to Kafka before `preCommit()` is invoked.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ErrantRecordSinkConnector extends MonitorableSinkConnector {
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ErrantRecordSinkTask.class;
+    }
+
+    public static class ErrantRecordSinkTask extends MonitorableSinkTask {
+        private ErrantRecordReporter reporter;
+
+        public ErrantRecordSinkTask() {
+            super();
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            super.start(props);
+            try {
+                reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
+            } catch (NoClassDefFoundError e) {
+                // Will occur in Connect runtimes earlier than 2.6

Review comment:
       Yeah, I should remove this comment. This test won't encounter this exception since it's always going to have the class and method if the test itself exists.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch closed pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch closed pull request #8720:
URL: https://github.com/apache/kafka/pull/8720


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430863637



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -695,6 +705,32 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return reporters;
     }
 
+    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+        ConnectorTaskId id,
+        SinkConnectorConfig connConfig,
+        Class<? extends Connector> connectorClass,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        // check if errant record reporter topic is configured
+        String topic = connConfig.dlqTopicName();
+        if ((topic != null && !topic.isEmpty()) || connConfig.enableErrorLog()) {

Review comment:
       Nice idea, although unfortunately there were no new configs introduced in the KIP and I cannot add this to the implementation if it was not apart of the voted on design.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431434538



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -695,6 +705,32 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
         return reporters;
     }
 
+    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+        ConnectorTaskId id,
+        SinkConnectorConfig connConfig,
+        Class<? extends Connector> connectorClass,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        // check if errant record reporter topic is configured
+        String topic = connConfig.dlqTopicName();
+        if ((topic != null && !topic.isEmpty()) || connConfig.enableErrorLog()) {

Review comment:
       The `SinkConnectorConfig` class isn't part of the public API; we can modify it without a KIP if we want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430860854



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to the sink
+   * connector's dead letter queue (DLQ).

Review comment:
       It is now getting written to the same DLQ, so I am just going to keep it like this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431612737



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;

Review comment:
       Is that for tests? Anyway, we can revisit in a cleanup in the future. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634970649


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431544842



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }
+
+    protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
+                              int partition, Schema keySchema, Object key, Schema valueSchema,
+                              Object value, long kafkaOffset, Long timestamp,
+                              TimestampType timestampType, Iterable<Header> headers) {
+        super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
+        this.originalRecord = originalRecord;
+    }
+
+    @Override
+    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key,
+                                Schema valueSchema, Object value, Long timestamp,
+                                Iterable<Header> headers) {
+        return new InternalSinkRecord(originalRecord, topic, kafkaPartition, keySchema, key,
+            valueSchema, value, kafkaOffset(), timestamp, timestampType(), headers());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return super.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
+    }

Review comment:
       IIUC, spotbugs complained if these were not here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-635085420


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634908833


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430865592



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    // Visible for testing
+    public class ErrantRecordFuture implements Future<Void> {
+
+        Future<RecordMetadata> future;
+
+        public ErrantRecordFuture(Future<RecordMetadata> producerFuture) {
+            future = producerFuture;
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException("Reporting an errant record cannot be cancelled.");
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            return future == null || future.isDone();
+        }
+
+        public Void get() throws InterruptedException, ExecutionException {
+            if (future != null) {

Review comment:
       Yeah, if I were trying to use the original report functionality with the log reporter. I've changed the functionality since and now I pass an already completed future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] aakashnshah commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
aakashnshah commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r430865210



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );

Review comment:
       Same as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431474116



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
+                              int partition, Schema keySchema, Object key, Schema valueSchema,
+                              Object value, long kafkaOffset, Long timestamp,
+                              TimestampType timestampType, Iterable<Header> headers) {
+        super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
+        this.originalRecord = originalRecord;
+    }
+
+    @Override
+    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key,
+                                Schema valueSchema, Object value, Long timestamp,
+                                Iterable<Header> headers) {
+        return new InternalSinkRecord(originalRecord, topic, kafkaPartition, keySchema, key,
+            valueSchema, value, kafkaOffset(), timestamp, timestampType(), headers());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return super.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "InternalSinkRecord{" +
+                "consumerRecord=" + originalRecord.toString() +
+                "} " + super.toString();

Review comment:
       Hmm, let's just have this delegate to the super method. It's internal, so we need not include the original record details.
   ```suggestion
           return super.toString();
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;

Review comment:
       These can be final.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,

Review comment:
       Let's make this protected.
   ```suggestion
       protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    final LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        // Most of the records will be an internal sink record, but the task could potentially
+        // report modified or new records, so handle both cases
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            // Generate a new consumer record from the modified sink record. We prefer
+            // to send the original consumer record (pre-transformed) to the DLQ,
+            // but in this case we don't have one and send the potentially transformed
+            // record instead
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic,
+                record.valueSchema(), record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            int keyLength = key != null ? key.length : -1;
+            int valLength = value != null ? value.length : -1;
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
+                valLength, key, value, headers);
+        }
+
+        Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the errant
+     * record reporter. This function is intended to be used to block on all the errant record
+     * futures.
+     */
+    public void awaitAllFutures() {
+        Future<?> future = null;
+        while ((future = futures.poll()) != null) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while awaiting an errant record future's " +
+                    "completition.");

Review comment:
       Nit: new line is unnecessary, and there's a misspelling:
   ```suggestion
                   log.error("Encountered an error while awaiting an errant record future's completion.");
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org