You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/28 06:50:48 UTC
[kafka] branch trunk updated: KAFKA-9971: Error Reporting in Sink
Connectors (KIP-610) (#8720)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 38c1e96 KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (#8720)
38c1e96 is described below
commit 38c1e96d2c2084c7f3c3f9e5309ca91953e7c88f
Author: Aakash Shah <as...@confluent.io>
AuthorDate: Wed May 27 23:49:57 2020 -0700
KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (#8720)
Implementation for KIP-610: https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors based on which sink connectors can now report errors at the final stages of the stream that exports records to the sink system.
This PR adds the `ErrantRecordReporter` interface as well as its implementation - `WorkerErrantRecordReporter`. The `WorkerErrantRecordReporter` is created in `Worker` and brought up through `WorkerSinkTask` to `WorkerSinkTaskContext`.
An integration test and unit tests have been added.
Reviewers: Lev Zemlyanov <le...@confluent.io>, Greg Harris <gr...@confluent.io>, Chris Egerton <ch...@confluent.io>, Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/connect/sink/ErrantRecordReporter.java | 53 +++++++
.../apache/kafka/connect/sink/SinkTaskContext.java | 26 ++++
.../kafka/connect/runtime/InternalSinkRecord.java | 82 ++++++++++
.../kafka/connect/runtime/SinkConnectorConfig.java | 5 +
.../org/apache/kafka/connect/runtime/Worker.java | 20 ++-
.../kafka/connect/runtime/WorkerSinkTask.java | 23 ++-
.../connect/runtime/WorkerSinkTaskContext.java | 6 +
.../runtime/errors/DeadLetterQueueReporter.java | 18 ++-
.../connect/runtime/errors/ErrorReporter.java | 9 +-
.../kafka/connect/runtime/errors/LogReporter.java | 11 +-
.../connect/runtime/errors/ProcessingContext.java | 24 ++-
.../runtime/errors/RetryWithToleranceOperator.java | 11 ++
.../runtime/errors/WorkerErrantRecordReporter.java | 167 +++++++++++++++++++++
.../integration/ErrantRecordSinkConnector.java | 61 ++++++++
.../integration/ExampleConnectIntegrationTest.java | 69 +++++++++
.../integration/MonitorableSinkConnector.java | 8 +-
.../connect/runtime/ErrorHandlingTaskTest.java | 2 +-
.../ErrorHandlingTaskWithTopicCreationTest.java | 6 +-
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 2 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 6 +-
.../connect/runtime/errors/ErrorReporterTest.java | 35 +++++
.../errors/RetryWithToleranceOperatorTest.java | 14 ++
.../errors/WorkerErrantRecordReporterTest.java | 74 +++++++++
.../connect/runtime/isolation/PluginUtilsTest.java | 3 +
25 files changed, 710 insertions(+), 27 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d13cd88..0241e6e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -12,7 +12,7 @@
<suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
files="(SchemaGenerator|MessageDataGenerator|FieldSpec).java"/>
<suppress checks="NPathComplexity"
- files="(MessageDataGenerator|FieldSpec).java"/>
+ files="(MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
<suppress checks="JavaNCSS"
files="(ApiMessageType).java|MessageDataGenerator.java"/>
<suppress checks="MethodLength"
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
new file mode 100644
index 0000000..a20e1e3
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * Component that the sink task can use as it {@link SinkTask#put(java.util.Collection)}.
+ * Reporter of problematic records and the corresponding problems.
+ *
+ * @since 2.6
+ */
+public interface ErrantRecordReporter {
+
+ /**
+ * Report a problematic record and the corresponding error to be written to the sink
+ * connector's dead letter queue (DLQ).
+ *
+ * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}.
+ * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
+ * record has been written or throw any exception that occurred while sending the record.
+ * If you want to simulate a simple blocking call you can call the <code>get()</code> method
+ * immediately.
+ *
+ * Connect guarantees that sink records reported through this reporter will be written to the error topic
+ * before the framework calls the {@link SinkTask#preCommit(java.util.Map)} method and therefore before
+ * committing the consumer offsets. SinkTask implementations can use the Future when stronger guarantees
+ * are required.
+ *
+ * @param record the problematic record; may not be null
+ * @param error the error capturing the problem with the record; may not be null
+ * @return a future that can be used to block until the record and error are reported
+ * to the DLQ
+ * @throws ConnectException if the error reporter and DLQ fails to write a reported record
+ * @since 2.6
+ */
+ Future<Void> report(SinkRecord record, Throwable error);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
index 340ef80..a431dbe 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
@@ -95,4 +95,30 @@ public interface SinkTaskContext {
*/
void requestCommit();
+ /**
+ * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord records}
+ * passed to the {@link SinkTask#put(java.util.Collection)} method. When reporting a failed record,
+ * the sink task will receive a {@link java.util.concurrent.Future} that the task can optionally use to wait until
+ * the failed record and exception have been written to Kafka. Note that the result of
+ * this method may be null if this connector has not been configured to use a reporter.
+ *
+ * <p>This method was added in Apache Kafka 2.6. Sink tasks that use this method but want to
+ * maintain backward compatibility so they can also be deployed to older Connect runtimes
+ * should guard the call to this method with a try-catch block, since calling this method will result in a
+ * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the sink connector is deployed to
+ * Connect runtimes older than Kafka 2.6. For example:
+ * <pre>
+ * ErrantRecordReporter reporter;
+ * try {
+ * reporter = context.errantRecordReporter();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+ * reporter = null;
+ * }
+ * </pre>
+ *
+ * @return the reporter; null if no error reporter has been configured for the connector
+ * @since 2.6
+ */
+ ErrantRecordReporter errantRecordReporter();
+
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
new file mode 100644
index 0000000..69554ff
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+/**
+ * A specialization of {@link SinkRecord} that allows a {@link WorkerSinkTask} to track the
+ * original {@link ConsumerRecord} for each {@link SinkRecord}. It is used internally and not
+ * exposed to connectors.
+ */
+public class InternalSinkRecord extends SinkRecord {
+
+ private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+ public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) {
+ super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
+ record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(),
+ record.timestampType(), record.headers());
+ this.originalRecord = originalRecord;
+ }
+
+ protected InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, String topic,
+ int partition, Schema keySchema, Object key, Schema valueSchema,
+ Object value, long kafkaOffset, Long timestamp,
+ TimestampType timestampType, Iterable<Header> headers) {
+ super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, headers);
+ this.originalRecord = originalRecord;
+ }
+
+ @Override
+ public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key,
+ Schema valueSchema, Object value, Long timestamp,
+ Iterable<Header> headers) {
+ return new InternalSinkRecord(originalRecord, topic, kafkaPartition, keySchema, key,
+ valueSchema, value, kafkaOffset(), timestamp, timestampType(), headers());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
+
+ /**
+ * Return the original consumer record that this sink record represents.
+ *
+ * @return the original consumer record; never null
+ */
+ public ConsumerRecord<byte[], byte[]> originalRecord() {
+ return originalRecord;
+ }
+}
+
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 0672f4e..71ba9c7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -120,6 +120,11 @@ public class SinkConnectorConfig extends ConnectorConfig {
return getBoolean(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG);
}
+ public boolean enableErrantRecordReporter() {
+ String dqlTopic = dlqTopicName();
+ return !dqlTopic.isEmpty() || enableErrorLog();
+ }
+
public static void main(String[] args) {
System.out.println(config.toHtml());
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 28b1149..26b0444 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -557,13 +558,15 @@ public class Worker {
log.info("Initializing: {}", transformationChain);
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
+ WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+ keyConverter, valueConverter, headerConverter);
Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
valueConverter, headerConverter, transformationChain, consumer, loader, time,
- retryWithToleranceOperator, herder.statusBackingStore());
+ retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask and current is {}", task);
throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
@@ -718,6 +721,7 @@ public class Worker {
connectorClientConfigOverridePolicy, kafkaClusterId);
Map<String, Object> adminProps = adminConfigs(id, "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
+
reporters.add(reporter);
}
@@ -733,6 +737,20 @@ public class Worker {
return reporters;
}
+ private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+ SinkConnectorConfig connConfig,
+ RetryWithToleranceOperator retryWithToleranceOperator,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter
+ ) {
+ // check if errant record reporter topic is configured
+ if (connConfig.enableErrantRecordReporter()) {
+ return new WorkerErrantRecordReporter(retryWithToleranceOperator, keyConverter, valueConverter, headerConverter);
+ }
+ return null;
+ }
+
private void stopTask(ConnectorTaskId taskId) {
try (LoggingContext loggingContext = LoggingContext.forTask(taskId)) {
WorkerTask task = tasks.get(taskId);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 3df9451..8c4a496 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -43,6 +43,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
+import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
@@ -94,6 +95,7 @@ class WorkerSinkTask extends WorkerTask {
private int commitFailures;
private boolean pausedForRedelivery;
private boolean committing;
+ private final WorkerErrantRecordReporter workerErrantRecordReporter;
public WorkerSinkTask(ConnectorTaskId id,
SinkTask task,
@@ -110,6 +112,7 @@ class WorkerSinkTask extends WorkerTask {
ClassLoader loader,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator,
+ WorkerErrantRecordReporter workerErrantRecordReporter,
StatusBackingStore statusBackingStore) {
super(id, statusListener, initialState, loader, connectMetrics,
retryWithToleranceOperator, time, statusBackingStore);
@@ -136,6 +139,7 @@ class WorkerSinkTask extends WorkerTask {
this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
this.consumer = consumer;
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+ this.workerErrantRecordReporter = workerErrantRecordReporter;
}
@Override
@@ -360,6 +364,12 @@ class WorkerSinkTask extends WorkerTask {
}
private void commitOffsets(long now, boolean closing) {
+ if (workerErrantRecordReporter != null) {
+ log.trace("Awaiting all reported errors to be completed");
+ workerErrantRecordReporter.awaitAllFutures();
+ log.trace("Completed all reported errors");
+ }
+
if (currentOffsets.isEmpty())
return;
@@ -502,7 +512,14 @@ class WorkerSinkTask extends WorkerTask {
if (isTopicTrackingEnabled) {
recordActiveTopic(origRecord.topic());
}
- return transformationChain.apply(origRecord);
+
+ // Apply the transformations
+ SinkRecord transformedRecord = transformationChain.apply(origRecord);
+ if (transformedRecord == null) {
+ return null;
+ }
+ // Error reporting will need to correlate each sink record with the original consumer record
+ return new InternalSinkRecord(msg, transformedRecord);
}
private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
@@ -518,6 +535,10 @@ class WorkerSinkTask extends WorkerTask {
return result;
}
+ protected WorkerErrantRecordReporter workerErrantRecordReporter() {
+ return workerErrantRecordReporter;
+ }
+
private void resumeAll() {
for (TopicPartition tp : consumer.assignment())
if (!context.pausedPartitions().contains(tp))
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 3a6b0d6..724b02e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,6 +160,11 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
}
@Override
+ public ErrantRecordReporter errantRecordReporter() {
+ return sinkTask.workerErrantRecordReporter();
+ }
+
+ @Override
public String toString() {
return "WorkerSinkTaskContext{" +
"id=" + sinkTask.id +
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 20ed2f2..e07a5d0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.RecordBatch;
@@ -36,7 +37,9 @@ import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import static java.util.Collections.singleton;
@@ -67,6 +70,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
private final SinkConnectorConfig connConfig;
private final ConnectorTaskId connectorTaskId;
private final ErrorHandlingMetrics errorHandlingMetrics;
+ private final String dlqTopicName;
private KafkaProducer<byte[], byte[]> kafkaProducer;
@@ -111,25 +115,25 @@ public class DeadLetterQueueReporter implements ErrorReporter {
this.connConfig = connConfig;
this.connectorTaskId = id;
this.errorHandlingMetrics = errorHandlingMetrics;
+ this.dlqTopicName = connConfig.dlqTopicName().trim();
}
/**
- * Write the raw records into a Kafka topic.
+ * Write the raw records into a Kafka topic and return the producer future.
*
* @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+ * @return the future associated with the writing of this record; never null
*/
- public void report(ProcessingContext context) {
- final String dlqTopicName = connConfig.dlqTopicName();
+ public Future<RecordMetadata> report(ProcessingContext context) {
if (dlqTopicName.isEmpty()) {
- return;
+ return CompletableFuture.completedFuture(null);
}
-
errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord();
if (originalMessage == null) {
errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
- return;
+ return CompletableFuture.completedFuture(null);
}
ProducerRecord<byte[], byte[]> producerRecord;
@@ -145,7 +149,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
populateContextHeaders(producerRecord, context);
}
- this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
+ return this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception);
errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
index 5eaa427..f9bc2f2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -16,17 +16,22 @@
*/
package org.apache.kafka.connect.runtime.errors;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import java.util.concurrent.Future;
+
/**
* Report an error using the information contained in the {@link ProcessingContext}.
*/
public interface ErrorReporter extends AutoCloseable {
/**
- * Report an error.
+ * Report an error and return the producer future.
*
* @param context the processing context (cannot be null).
+ * @return future result from the producer sending a record to Kafka.
*/
- void report(ProcessingContext context);
+ Future<RecordMetadata> report(ProcessingContext context);
@Override
default void close() { }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
index 8b07adf..cf9db2c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
@@ -16,12 +16,15 @@
*/
package org.apache.kafka.connect.runtime.errors;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
/**
* Writes errors and their context to application logs.
@@ -29,6 +32,7 @@ import java.util.Objects;
public class LogReporter implements ErrorReporter {
private static final Logger log = LoggerFactory.getLogger(LogReporter.class);
+ private static final Future<RecordMetadata> COMPLETED = CompletableFuture.completedFuture(null);
private final ConnectorTaskId id;
private final ConnectorConfig connConfig;
@@ -50,17 +54,18 @@ public class LogReporter implements ErrorReporter {
* @param context the processing context.
*/
@Override
- public void report(ProcessingContext context) {
+ public Future<RecordMetadata> report(ProcessingContext context) {
if (!connConfig.enableErrorLog()) {
- return;
+ return COMPLETED;
}
if (!context.failed()) {
- return;
+ return COMPLETED;
}
log.error(message(context), context.error());
errorHandlingMetrics.recordErrorLogged();
+ return COMPLETED;
}
// Visible for testing
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
index e7fb031..0ddf894 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -17,13 +17,20 @@
package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
/**
* Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant
@@ -132,11 +139,22 @@ class ProcessingContext implements AutoCloseable {
/**
* Report errors. Should be called only if an error was encountered while executing the operation.
+ *
+ * @return a errant record future that potentially aggregates the producer futures
*/
- public void report() {
- for (ErrorReporter reporter: reporters) {
- reporter.report(this);
+ public Future<Void> report() {
+ if (reporters.size() == 1) {
+ return new ErrantRecordFuture(Collections.singletonList(reporters.iterator().next().report(this)));
+ }
+
+ List<Future<RecordMetadata>> futures = reporters.stream()
+ .map(r -> r.report(this))
+ .filter(Future::isDone)
+ .collect(Collectors.toCollection(LinkedList::new));
+ if (futures.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
}
+ return new ErrantRecordFuture(futures);
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 4e627ef..ff1702d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -83,6 +84,16 @@ public class RetryWithToleranceOperator implements AutoCloseable {
this.time = time;
}
+ public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
+ ConsumerRecord<byte[], byte[]> consumerRecord,
+ Throwable error) {
+ context.consumerRecord(consumerRecord);
+ context.currentContext(stage, executingClass);
+ context.error(error);
+ errorHandlingMetrics.recordError();
+ return context.report();
+ }
+
/**
* Execute the recoverable operation. If the operation is already in a failed state, then simply return
* with the existing failure.
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
new file mode 100644
index 0000000..6e1fa53
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+ private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+ private final RetryWithToleranceOperator retryWithToleranceOperator;
+ private final Converter keyConverter;
+ private final Converter valueConverter;
+ private final HeaderConverter headerConverter;
+
+ // Visible for testing
+ protected final LinkedList<Future<Void>> futures;
+
+ public WorkerErrantRecordReporter(
+ RetryWithToleranceOperator retryWithToleranceOperator,
+ Converter keyConverter,
+ Converter valueConverter,
+ HeaderConverter headerConverter
+ ) {
+ this.retryWithToleranceOperator = retryWithToleranceOperator;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ this.headerConverter = headerConverter;
+ this.futures = new LinkedList<>();
+ }
+
+ @Override
+ public Future<Void> report(SinkRecord record, Throwable error) {
+ ConsumerRecord<byte[], byte[]> consumerRecord;
+
+ // Most of the records will be an internal sink record, but the task could potentially
+ // report modified or new records, so handle both cases
+ if (record instanceof InternalSinkRecord) {
+ consumerRecord = ((InternalSinkRecord) record).originalRecord();
+ } else {
+ // Generate a new consumer record from the modified sink record. We prefer
+ // to send the original consumer record (pre-transformed) to the DLQ,
+ // but in this case we don't have one and send the potentially transformed
+ // record instead
+ String topic = record.topic();
+ byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key());
+ byte[] value = valueConverter.fromConnectData(topic,
+ record.valueSchema(), record.value());
+
+ RecordHeaders headers = new RecordHeaders();
+ if (record.headers() != null) {
+ for (Header header : record.headers()) {
+ String headerKey = header.key();
+ byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey,
+ header.schema(), header.value());
+ headers.add(headerKey, rawHeader);
+ }
+ }
+
+ int keyLength = key != null ? key.length : -1;
+ int valLength = value != null ? value.length : -1;
+
+ consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
+ record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
+ valLength, key, value, headers);
+ }
+
+ Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+ SinkTask.class, consumerRecord, error);
+
+ if (!future.isDone()) {
+ futures.add(future);
+ }
+ return future;
+ }
+
+ /**
+ * Gets all futures returned by the sink records sent to Kafka by the errant
+ * record reporter. This function is intended to be used to block on all the errant record
+ * futures.
+ */
+ public void awaitAllFutures() {
+ Future<?> future;
+ while ((future = futures.poll()) != null) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Encountered an error while awaiting an errant record future's completion.");
+ throw new ConnectException(e);
+ }
+ }
+ }
+
+ /**
+ * Wrapper class to aggregate producer futures and abstract away the record metadata from the
+ * Connect user.
+ */
+ public static class ErrantRecordFuture implements Future<Void> {
+
+ private final List<Future<RecordMetadata>> futures;
+
+ public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) {
+ futures = producerFutures;
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ throw new UnsupportedOperationException("Reporting an errant record cannot be cancelled.");
+ }
+
+ public boolean isCancelled() {
+ return false;
+ }
+
+ public boolean isDone() {
+ return futures.stream().allMatch(Future::isDone);
+ }
+
+ public Void get() throws InterruptedException, ExecutionException {
+ for (Future<RecordMetadata> future: futures) {
+ future.get();
+ }
+ return null;
+ }
+
+ public Void get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ for (Future<RecordMetadata> future: futures) {
+ future.get(timeout, unit);
+ }
+ return null;
+ }
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
new file mode 100644
index 0000000..0fe2f88
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ErrantRecordSinkConnector extends MonitorableSinkConnector {
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return ErrantRecordSinkTask.class;
+ }
+
+ public static class ErrantRecordSinkTask extends MonitorableSinkTask {
+ private ErrantRecordReporter reporter;
+
+ public ErrantRecordSinkTask() {
+ super();
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ super.start(props);
+ reporter = context.errantRecordReporter();
+ }
+
+ @Override
+ public void put(Collection<SinkRecord> records) {
+ for (SinkRecord rec : records) {
+ taskHandle.record();
+ TopicPartition tp = cachedTopicPartitions
+ .computeIfAbsent(rec.topic(), v -> new HashMap<>())
+ .computeIfAbsent(rec.kafkaPartition(), v -> new TopicPartition(rec.topic(), rec.kafkaPartition()));
+ committedOffsets.put(tp, committedOffsets.getOrDefault(tp, 0L) + 1);
+ reporter.report(rec, new Throwable());
+ }
+ }
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 8538fb4..a2b40e5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -36,6 +36,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
@@ -65,6 +66,9 @@ public class ExampleConnectIntegrationTest {
private static final String CONNECTOR_NAME = "simple-conn";
private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName();
private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName();
+ private static final String DLQ_TOPIC = "dlq-topic";
+ private static final String ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME =
+ ErrantRecordSinkConnector.class.getSimpleName();
private EmbeddedConnectCluster connect;
private ConnectorHandle connectorHandle;
@@ -219,6 +223,71 @@ public class ExampleConnectIntegrationTest {
connect.deleteConnector(CONNECTOR_NAME);
}
+ @Test
+ public void testErrantRecordReporter() throws Exception {
+ connect.kafka().createTopic(DLQ_TOPIC, 1);
+ // create test topic
+ connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+ // setup up props for the sink connector
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME);
+ props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+ props.put(TOPICS_CONFIG, "test-topic");
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+
+ // expect all records to be consumed by the connector
+ connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+ // expect all records to be consumed by the connector
+ connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
+
+ // validate the intended connector configuration, a config that errors
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 1,
+ "Validating connector configuration produced an unexpected number or errors.");
+
+ // add missing configuration to make the config valid
+ props.put("name", CONNECTOR_NAME);
+
+ // validate the intended connector configuration, a valid config
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 0,
+ "Validating connector configuration produced an unexpected number or errors.");
+
+ // start a sink connector
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ waitForCondition(this::checkForPartitionAssignment,
+ CONNECTOR_SETUP_DURATION_MS,
+ "Connector tasks were not assigned a partition each.");
+
+ // produce some messages into source topic partitions
+ for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+ connect.kafka().produce("test-topic", i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i);
+ }
+
+ // consume all records from the source topic or fail, to ensure that they were correctly produced.
+ assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED,
+ connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count());
+
+ // wait for the connector tasks to consume all records.
+ connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
+
+ // wait for the connector tasks to commit all records.
+ connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
+
+ // consume all records from the dlq topic or fail, to ensure that they were correctly produced
+ int recordNum = connect.kafka().consume(
+ NUM_RECORDS_PRODUCED,
+ RECORD_TRANSFER_DURATION_MS,
+ DLQ_TOPIC
+ ).count();
+
+ // delete connector
+ connect.deleteConnector(CONNECTOR_NAME);
+ }
+
/**
* Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from a
* {@link org.apache.kafka.test.TestUtils#waitForCondition} that will throw an error if this method continued
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
index 05b2dfd..9bd0756 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
@@ -90,10 +90,10 @@ public class MonitorableSinkConnector extends TestSinkConnector {
private String connectorName;
private String taskId;
- private TaskHandle taskHandle;
- private Set<TopicPartition> assignments;
- private Map<TopicPartition, Long> committedOffsets;
- private Map<String, Map<Integer, TopicPartition>> cachedTopicPartitions;
+ TaskHandle taskHandle;
+ Set<TopicPartition> assignments;
+ Map<TopicPartition, Long> committedOffsets;
+ Map<String, Map<Integer, TopicPartition>> cachedTopicPartitions;
public MonitorableSinkTask() {
this.assignments = new HashSet<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 4434904..c471b03 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -538,7 +538,7 @@ public class ErrorHandlingTaskTest {
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, converter, converter,
headerConverter, sinkTransforms, consumer, pluginLoader, time,
- retryWithToleranceOperator, statusBackingStore);
+ retryWithToleranceOperator, null, statusBackingStore);
}
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
index 909ec4a..aba6445 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
+import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -161,6 +162,9 @@ public class ErrorHandlingTaskWithTopicCreationTest {
@SuppressWarnings("unused")
@Mock private StatusBackingStore statusBackingStore;
+ @Mock
+ private WorkerErrantRecordReporter workerErrantRecordReporter;
+
private ErrorHandlingMetrics errorHandlingMetrics;
// when this test becomes parameterized, this variable will be a test parameter
@@ -548,7 +552,7 @@ public class ErrorHandlingTaskWithTopicCreationTest {
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, converter, converter,
headerConverter, sinkTransforms, consumer, pluginLoader, time,
- retryWithToleranceOperator, statusBackingStore);
+ retryWithToleranceOperator, workerErrantRecordReporter, statusBackingStore);
}
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 5dc2f44..12a553f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -183,7 +183,7 @@ public class WorkerSinkTaskTest {
taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics,
keyConverter, valueConverter, headerConverter,
transformationChain, consumer, pluginLoader, time,
- RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore);
+ RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore);
}
@After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index be2584a..7a82b20 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -144,7 +144,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter,
valueConverter, headerConverter,
new TransformationChain<>(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR),
- consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore);
+ consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore);
recordsReturned = 0;
}
@@ -187,7 +187,9 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
for (SinkRecord rec : recs) {
SinkRecord referenceSinkRecord
= new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset, TIMESTAMP, TIMESTAMP_TYPE);
- assertEquals(referenceSinkRecord, rec);
+ InternalSinkRecord referenceInternalSinkRecord =
+ new InternalSinkRecord(null, referenceSinkRecord);
+ assertEquals(referenceInternalSinkRecord, rec);
offset++;
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index f01cd49..85f6948 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -43,6 +43,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import static java.util.Collections.emptyMap;
@@ -147,6 +148,21 @@ public class ErrorReporterTest {
}
@Test
+ public void testDLQReportAndReturnFuture() {
+ DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
+ producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
+
+ ProcessingContext context = processingContext();
+
+ EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata);
+ replay(producer);
+
+ deadLetterQueueReporter.report(context);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testCloseDLQ() {
DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
@@ -212,6 +228,25 @@ public class ErrorReporterTest {
}
@Test
+ public void testLogReportAndReturnFuture() {
+ Map<String, String> props = new HashMap<>();
+ props.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
+ props.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
+
+ LogReporter logReporter = new LogReporter(TASK_ID, config(props), errorHandlingMetrics);
+
+ ProcessingContext context = processingContext();
+
+ String msg = logReporter.message(context);
+ assertEquals("Error encountered in task job-0. Executing stage 'KEY_CONVERTER' with class " +
+ "'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test-topic', " +
+ "partition=5, offset=100}.", msg);
+
+ Future<RecordMetadata> future = logReporter.report(context);
+ assertTrue(future instanceof CompletableFuture);
+ }
+
+ @Test
public void testSetDLQConfigs() {
SinkConnectorConfig configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC));
assertEquals(configuration.dlqTopicName(), DLQ_TOPIC);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index 43bac54..3aed19d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
@@ -88,12 +89,25 @@ public class RetryWithToleranceOperatorTest {
private Operation<String> mockOperation;
@Mock
+ private ConsumerRecord<byte[], byte[]> consumerRecord;
+
+ @Mock
ErrorHandlingMetrics errorHandlingMetrics;
@Mock
Plugins plugins;
@Test
+ public void testExecuteFailed() {
+ RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
+ ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
+ retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+ retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+ SinkTask.class, consumerRecord, new Throwable());
+ }
+
+ @Test
public void testHandleExceptionInTransformations() {
testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
new file mode 100644
index 0000000..07a4f9e
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerErrantRecordReporterTest {
+
+ private WorkerErrantRecordReporter reporter;
+
+ @Mock
+ private RetryWithToleranceOperator retryWithToleranceOperator;
+
+ @Mock
+ private Converter converter;
+
+ @Mock
+ private HeaderConverter headerConverter;
+
+ @Mock
+ private SinkRecord record;
+
+ @Before
+ public void setup() {
+ reporter = new WorkerErrantRecordReporter(
+ retryWithToleranceOperator,
+ converter,
+ converter,
+ headerConverter
+ );
+ }
+
+ @Test
+ public void testGetAllFutures() {
+ assertTrue(reporter.futures.isEmpty());
+ for (int i = 0; i < 4; i++) {
+ reporter.futures.add(CompletableFuture.completedFuture(null));
+ }
+ assertFalse(reporter.futures.isEmpty());
+ reporter.awaitAllFutures();
+ assertTrue(reporter.futures.isEmpty());
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index c406ead..4baaa6d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -117,6 +117,9 @@ public class PluginUtilsTest {
assertFalse(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.rest.ConnectRestExtension")
);
+ assertFalse(PluginUtils.shouldLoadInIsolation(
+ "org.apache.kafka.connect.sink.ErrantRecordReporter"
+ ));
}
@Test