You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/05/30 18:39:59 UTC

[kafka] branch trunk updated: KAFKA-6738: Implement error handling for source and sink tasks (KIP-298)

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f8dfbb0  KAFKA-6738: Implement error handling for source and sink tasks (KIP-298)
f8dfbb0 is described below

commit f8dfbb067caa797c19494e12da6b4c6786980f73
Author: Arjun Satish <ar...@confluent.io>
AuthorDate: Wed May 30 11:39:45 2018 -0700

    KAFKA-6738: Implement error handling for source and sink tasks (KIP-298)
    
    This PR implements the features described in this KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
    
    This PR changes the Connect framework to allow it to automatically deal with errors encountered while processing records in a Connector. The following behavior changes are introduced here:
    
    **Retry on Failure**: Retry the failed operation a configurable number of times, with backoff between each retry.
    **Task Tolerance Limits**: Tolerate a configurable number of failures in a task.
    
    We also add the following ways to report errors, along with sufficient context to simplify the debugging process:
    
    **Log Error Context**: The error information along with processing context is logged along with standard application logs.
    **Dead Letter Queue**: Produce the original message into a Kafka topic (applicable only to sink connectors).
    
    New **metrics** which will monitor the number of failures, and the behavior of the response handler are added.
    
    The changes proposed here **are backward compatible**. The current behavior in Connect is to kill the task on the first error in any stage. This will remain the default behavior if the connector does not override any of the new configurations which are provided as part of this feature.
    
    Testing: added multiple unit tests to test the retry and tolerance logic.
    
    Author: Arjun Satish <ar...@confluent.io>
    Author: Ewen Cheslack-Postava <me...@ewencp.org>
    
    Reviewers: Magesh Nandakumar <ma...@gmail.com>, Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5065 from wicknicks/KAFKA-6378
---
 .../kafka/connect/runtime/ConnectMetrics.java      |  17 +-
 .../connect/runtime/ConnectMetricsRegistry.java    |  35 ++
 .../kafka/connect/runtime/TransformationChain.java |  19 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |  58 ++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  67 +--
 .../kafka/connect/runtime/WorkerSourceTask.java    |  59 ++-
 .../apache/kafka/connect/runtime/WorkerTask.java   |   7 +-
 .../runtime/errors/DeadLetterQueueReporter.java    | 154 +++++++
 .../runtime/errors/ErrorHandlingMetrics.java       | 150 +++++++
 .../connect/runtime/errors/ErrorReporter.java      |  41 ++
 .../kafka/connect/runtime/errors/LogReporter.java  | 111 +++++
 .../kafka/connect/runtime/errors/Operation.java    |  28 ++
 .../connect/runtime/errors/ProcessingContext.java  | 219 ++++++++++
 .../runtime/errors/RetryWithToleranceOperator.java | 328 ++++++++++++++
 .../apache/kafka/connect/runtime/errors/Stage.java |  63 +++
 .../connect/runtime/errors/ToleranceType.java      |  39 ++
 .../connect/runtime/ErrorHandlingTaskTest.java     | 474 +++++++++++++++++++++
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |  10 +-
 .../runtime/WorkerSinkTaskThreadedTest.java        |   5 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |   4 +-
 .../kafka/connect/runtime/WorkerTaskTest.java      |  18 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   |  18 +-
 .../connect/runtime/errors/ErrorReporterTest.java  | 198 +++++++++
 .../errors/RetryWithToleranceOperatorTest.java     | 308 +++++++++++++
 24 files changed, 2356 insertions(+), 74 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 5bbe148..d1e97b2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -64,15 +64,20 @@ public class ConnectMetrics {
      * @param time     the time; may not be null
      */
     public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
+        this(workerId, time, config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG),
+                config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG),
+                config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class));
+    }
+
+    public ConnectMetrics(String workerId, Time time, int numSamples, long sampleWindowMs, String metricsRecordingLevel,
+                          List<MetricsReporter> reporters) {
         this.workerId = workerId;
         this.time = time;
 
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
-                                                      .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                                                                  TimeUnit.MILLISECONDS).recordLevel(
-                        Sensor.RecordingLevel.forName(config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG)));
-        List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
-                                                                        MetricsReporter.class);
+        MetricConfig metricConfig = new MetricConfig().samples(numSamples)
+                .timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
+                        Sensor.RecordingLevel.forName(metricsRecordingLevel));
         reporters.add(new JmxReporter(JMX_PREFIX));
         this.metrics = new Metrics(metricConfig, reporters, time);
         LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index d78576e..04699ea 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -34,6 +34,7 @@ public class ConnectMetricsRegistry {
     public static final String SINK_TASK_GROUP_NAME = "sink-task-metrics";
     public static final String WORKER_GROUP_NAME = "connect-worker-metrics";
     public static final String WORKER_REBALANCE_GROUP_NAME = "connect-worker-rebalance-metrics";
+    public static final String TASK_ERROR_HANDLING_GROUP_NAME = "task-error-metrics";
 
     private final List<MetricNameTemplate> allTemplates = new ArrayList<>();
     public final MetricNameTemplate connectorStatus;
@@ -93,6 +94,14 @@ public class ConnectMetricsRegistry {
     public final MetricNameTemplate rebalanceTimeMax;
     public final MetricNameTemplate rebalanceTimeAvg;
     public final MetricNameTemplate rebalanceTimeSinceLast;
+    public final MetricNameTemplate recordProcessingFailures;
+    public final MetricNameTemplate recordProcessingErrors;
+    public final MetricNameTemplate recordsSkipped;
+    public final MetricNameTemplate retries;
+    public final MetricNameTemplate errorsLogged;
+    public final MetricNameTemplate dlqProduceRequests;
+    public final MetricNameTemplate dlqProduceFailures;
+    public final MetricNameTemplate lastErrorTimestamp;
 
     public ConnectMetricsRegistry() {
         this(new LinkedHashSet<String>());
@@ -294,6 +303,28 @@ public class ConnectMetricsRegistry {
                                           "The average time in milliseconds spent by this worker to rebalance.", rebalanceTags);
         rebalanceTimeSinceLast = createTemplate("time-since-last-rebalance-ms", WORKER_REBALANCE_GROUP_NAME,
                                                 "The time in milliseconds since this worker completed the most recent rebalance.", rebalanceTags);
+
+        /***** Task Error Handling Metrics *****/
+        Set<String> taskErrorHandlingTags = new LinkedHashSet<>(tags);
+        taskErrorHandlingTags.add(CONNECTOR_TAG_NAME);
+        taskErrorHandlingTags.add(TASK_TAG_NAME);
+
+        recordProcessingFailures = createTemplate("total-record-failures", TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of record processing failures in this task.", taskErrorHandlingTags);
+        recordProcessingErrors = createTemplate("total-record-errors", TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of record processing errors in this task. ", taskErrorHandlingTags);
+        recordsSkipped = createTemplate("total-records-skipped", TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of records skipped due to errors.", taskErrorHandlingTags);
+        retries = createTemplate("total-retries", TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of operations retried.", taskErrorHandlingTags);
+        errorsLogged = createTemplate("total-errors-logged", TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of errors that were logged.", taskErrorHandlingTags);
+        dlqProduceRequests = createTemplate("deadletterqueue-produce-requests", TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of attempted writes to the dead letter queue.", taskErrorHandlingTags);
+        dlqProduceFailures = createTemplate("deadletterqueue-produce-failures", TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of failed writes to the dead letter queue.", taskErrorHandlingTags);
+        lastErrorTimestamp = createTemplate("last-error-timestamp", TASK_ERROR_HANDLING_GROUP_NAME,
+                "The epoch timestamp when this task last encountered an error.", taskErrorHandlingTags);
     }
 
     private MetricNameTemplate createTemplate(String name, String group, String doc, Set<String> tags) {
@@ -337,4 +368,8 @@ public class ConnectMetricsRegistry {
     public String workerRebalanceGroupName() {
         return WORKER_REBALANCE_GROUP_NAME;
     }
+
+    public String taskErrorHandlingGroupName() {
+        return TASK_ERROR_HANDLING_GROUP_NAME;
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index e1d8b1f..3680905 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -17,25 +17,32 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.transforms.Transformation;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
 public class TransformationChain<R extends ConnectRecord<R>> {
 
     private final List<Transformation<R>> transformations;
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
 
-    public TransformationChain(List<Transformation<R>> transformations) {
+    public TransformationChain(List<Transformation<R>> transformations, RetryWithToleranceOperator retryWithToleranceOperator) {
         this.transformations = transformations;
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
     }
 
     public R apply(R record) {
         if (transformations.isEmpty()) return record;
 
-        for (Transformation<R> transformation : transformations) {
-            record = transformation.apply(record);
+        for (final Transformation<R> transformation : transformations) {
+            final R current = record;
+
+            // execute the operation
+            record = retryWithToleranceOperator.execute(() -> transformation.apply(current), Stage.TRANSFORMATION, transformation.getClass());
+
             if (record == null) break;
         }
 
@@ -61,8 +68,4 @@ public class TransformationChain<R extends ConnectRecord<R>> {
         return Objects.hash(transformations);
     }
 
-    public static <R extends ConnectRecord<R>> TransformationChain<R> noOp() {
-        return new TransformationChain<R>(Collections.<Transformation<R>>emptyList());
-    }
-
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1c64658..c58eddf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -30,6 +30,11 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.LogReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -448,26 +453,71 @@ public class Worker {
                                        Converter valueConverter,
                                        HeaderConverter headerConverter,
                                        ClassLoader loader) {
+        ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator();
+        retryWithToleranceOperator.configure(connConfig.originalsWithPrefix("errors."));
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
         // Decide which type of worker task we need based on the type of task.
         if (task instanceof SourceTask) {
-            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
             OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time);
+                    headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader,
+                    time, retryWithToleranceOperator);
         } else if (task instanceof SinkTask) {
-            TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations());
+            TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, connConfig, errorHandlingMetrics));
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, metrics, keyConverter,
-                    valueConverter, headerConverter, transformationChain, loader, time);
+                    valueConverter, headerConverter, transformationChain, loader, time,
+                    retryWithToleranceOperator);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
         }
     }
 
+    ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
+        return new ErrorHandlingMetrics(id, metrics);
+    }
+
+    private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig,
+                                                    ErrorHandlingMetrics errorHandlingMetrics) {
+        ArrayList<ErrorReporter> reporters = new ArrayList<>();
+        LogReporter logReporter = new LogReporter(id);
+        logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + "."));
+        logReporter.metrics(errorHandlingMetrics);
+        reporters.add(logReporter);
+
+        // check if topic for dead letter queue exists
+        String topic = connConfig.getString(DeadLetterQueueReporter.PREFIX + "." + DeadLetterQueueReporter.DLQ_TOPIC_NAME);
+        if (topic != null && !topic.isEmpty()) {
+            DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps);
+            reporter.configure(connConfig.originalsWithPrefix(DeadLetterQueueReporter.PREFIX + "."));
+            reporters.add(reporter);
+        }
+
+        return reporters;
+    }
+
+    private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig,
+                                                      ErrorHandlingMetrics errorHandlingMetrics) {
+        List<ErrorReporter> reporters = new ArrayList<>();
+        LogReporter logReporter = new LogReporter(id);
+        logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + "."));
+        logReporter.metrics(errorHandlingMetrics);
+        reporters.add(logReporter);
+
+        return reporters;
+    }
+
     private void stopTask(ConnectorTaskId taskId) {
         WorkerTask task = tasks.get(taskId);
         if (task == null) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index e629798..3296007 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -40,6 +40,8 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -100,8 +102,9 @@ class WorkerSinkTask extends WorkerTask {
                           HeaderConverter headerConverter,
                           TransformationChain<SinkRecord> transformationChain,
                           ClassLoader loader,
-                          Time time) {
-        super(id, statusListener, initialState, loader, connectMetrics);
+                          Time time,
+                          RetryWithToleranceOperator retryWithToleranceOperator) {
+        super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -477,42 +480,52 @@ class WorkerSinkTask extends WorkerTask {
         for (ConsumerRecord<byte[], byte[]> msg : msgs) {
             log.trace("{} Consuming and converting message in topic '{}' partition {} at offset {} and timestamp {}",
                     this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
-            SchemaAndValue keyAndSchema = toConnectData(keyConverter, "key", msg, msg.key());
-            SchemaAndValue valueAndSchema = toConnectData(valueConverter, "value", msg, msg.value());
-            Headers headers = convertHeadersFor(msg);
-            Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
-            SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(),
-                    keyAndSchema.schema(), keyAndSchema.value(),
-                    valueAndSchema.schema(), valueAndSchema.value(),
-                    msg.offset(),
-                    timestamp,
-                    msg.timestampType(),
-                    headers);
-            log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
-                    this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
-            SinkRecord transRecord = transformationChain.apply(origRecord);
+
+            retryWithToleranceOperator.consumerRecord(msg);
+
+            SinkRecord transRecord = convertAndTransformRecord(msg);
+
             origOffsets.put(
-                    new TopicPartition(origRecord.topic(), origRecord.kafkaPartition()),
-                    new OffsetAndMetadata(origRecord.kafkaOffset() + 1)
+                    new TopicPartition(msg.topic(), msg.partition()),
+                    new OffsetAndMetadata(msg.offset() + 1)
             );
             if (transRecord != null) {
                 messageBatch.add(transRecord);
             } else {
-                log.trace("{} Transformations returned null, so dropping record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
-                        this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
+                log.trace(
+                        "{} Converters and transformations returned null, possibly because of too many retries, so " +
+                                "dropping record in topic '{}' partition {} at offset {}",
+                        this, msg.topic(), msg.partition(), msg.offset()
+                );
             }
         }
         sinkTaskMetricsGroup.recordConsumedOffsets(origOffsets);
     }
 
-    private SchemaAndValue toConnectData(Converter converter, String converterName, ConsumerRecord<byte[], byte[]> msg, byte[] data) {
-        try {
-            return converter.toConnectData(msg.topic(), data);
-        } catch (Throwable e) {
-            String str = String.format("Error converting message %s in topic '%s' partition %d at offset %d and timestamp %d",
-                    converterName, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
-            throw new ConnectException(str, e);
+    private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]> msg) {
+        SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(() -> keyConverter.toConnectData(msg.topic(), msg.key()),
+                Stage.KEY_CONVERTER, keyConverter.getClass());
+
+        SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(() -> valueConverter.toConnectData(msg.topic(), msg.value()),
+                Stage.VALUE_CONVERTER, valueConverter.getClass());
+
+        Headers headers = retryWithToleranceOperator.execute(() -> convertHeadersFor(msg), Stage.HEADER_CONVERTER, headerConverter.getClass());
+
+        if (retryWithToleranceOperator.failed()) {
+            return null;
         }
+
+        Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
+        SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(),
+                keyAndSchema.schema(), keyAndSchema.value(),
+                valueAndSchema.schema(), valueAndSchema.value(),
+                msg.offset(),
+                timestamp,
+                msg.timestampType(),
+                headers);
+        log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
+                this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
+        return transformationChain.apply(origRecord);
     }
 
     private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index f17475d..e7b92a4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -34,6 +34,8 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -103,8 +105,10 @@ class WorkerSourceTask extends WorkerTask {
                             WorkerConfig workerConfig,
                             ConnectMetrics connectMetrics,
                             ClassLoader loader,
-                            Time time) {
-        super(id, statusListener, initialState, loader, connectMetrics);
+                            Time time,
+                            RetryWithToleranceOperator retryWithToleranceOperator) {
+
+        super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -209,7 +213,7 @@ class WorkerSourceTask extends WorkerTask {
                 if (toSend == null) {
                     log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
                     long start = time.milliseconds();
-                    toSend = task.poll();
+                    toSend = poll();
                     if (toSend != null) {
                         recordPollReturned(toSend.size(), time.milliseconds() - start);
                     }
@@ -231,6 +235,44 @@ class WorkerSourceTask extends WorkerTask {
         }
     }
 
+    protected List<SourceRecord> poll() throws InterruptedException {
+        try {
+            return task.poll();
+        } catch (RetriableException e) {
+            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
+            // Do nothing. Let the framework poll whenever it's ready.
+            return null;
+        }
+    }
+
+    /**
+     * Convert the source record into a producer record.
+     *
+     * @param record the transformed record
+     * @return the producer record which can sent over to Kafka. A null is returned if the input is null or
+     * if an error was encountered during any of the converter stages.
+     */
+    private ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord record) {
+        if (record == null) {
+            return null;
+        }
+
+        RecordHeaders headers = retryWithToleranceOperator.execute(() -> convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverter.getClass());
+
+        byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key()),
+                Stage.KEY_CONVERTER, keyConverter.getClass());
+
+        byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()),
+                Stage.VALUE_CONVERTER, valueConverter.getClass());
+
+        if (retryWithToleranceOperator.failed()) {
+            return null;
+        }
+
+        return new ProducerRecord<>(record.topic(), record.kafkaPartition(),
+                ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers);
+    }
+
     /**
      * Try to send a batch of records. If a send fails and is retriable, this saves the remainder of the batch so it can
      * be retried after backing off. If a send fails and is not retriable, this will throw a ConnectException.
@@ -241,19 +283,16 @@ class WorkerSourceTask extends WorkerTask {
         recordBatch(toSend.size());
         final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
         for (final SourceRecord preTransformRecord : toSend) {
-            final SourceRecord record = transformationChain.apply(preTransformRecord);
 
-            if (record == null) {
+            retryWithToleranceOperator.sourceRecord(preTransformRecord);
+            final SourceRecord record = transformationChain.apply(preTransformRecord);
+            final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
+            if (producerRecord == null || retryWithToleranceOperator.failed()) {
                 counter.skipRecord();
                 commitTaskRecord(preTransformRecord);
                 continue;
             }
 
-            RecordHeaders headers = convertHeaderFor(record);
-            byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
-            byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
-            final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(),
-                    ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers);
             log.trace("{} Appending record with key {}, value {}", this, record.key(), record.value());
             // We need this queued first since the callback could happen immediately (even synchronously in some cases).
             // Because of this we need to be careful about handling retries -- we always save the previously attempted
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index d563f9b..6ea1ddd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.AbstractStatus.State;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -60,11 +61,14 @@ abstract class WorkerTask implements Runnable {
     private volatile boolean stopping;   // indicates whether the Worker has asked the task to stop
     private volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
 
+    protected final RetryWithToleranceOperator retryWithToleranceOperator;
+
     public WorkerTask(ConnectorTaskId id,
                       TaskStatus.Listener statusListener,
                       TargetState initialState,
                       ClassLoader loader,
-                      ConnectMetrics connectMetrics) {
+                      ConnectMetrics connectMetrics,
+                      RetryWithToleranceOperator retryWithToleranceOperator) {
         this.id = id;
         this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
         this.statusListener = taskMetricsGroup;
@@ -73,6 +77,7 @@ abstract class WorkerTask implements Runnable {
         this.stopping = false;
         this.cancelled = false;
         this.taskMetricsGroup.recordState(this.targetState);
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
     }
 
     public ConnectorTaskId id() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
new file mode 100644
index 0000000..454a619
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.Collections.singleton;
+
+/**
+ * Write the original consumed record into a dead letter queue. The dead letter queue is a Kafka topic located
+ * on the same cluster used by the worker to maintain internal topics. Each connector is typically configured
+ * with its own Kafka topic dead letter queue. By default, the topic name is not set, and if the
+ * connector config doesn't specify one, this feature is disabled.
+ */
+public class DeadLetterQueueReporter implements ErrorReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueReporter.class);
+
+    private static final short DLQ_MAX_DESIRED_REPLICATION_FACTOR = 3;
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    public static final String PREFIX = "errors.deadletterqueue";
+
+    public static final String DLQ_TOPIC_NAME = "topic.name";
+    public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic where these messages are written to.";
+    public static final String DLQ_TOPIC_DEFAULT = "";
+
+    private DeadLetterQueueReporterConfig config;
+    private KafkaProducer<byte[], byte[]> kafkaProducer;
+    private ErrorHandlingMetrics errorHandlingMetrics;
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(DLQ_TOPIC_NAME, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, ConfigDef.Importance.HIGH, DLQ_TOPIC_NAME_DOC);
+
+    public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
+                                                         ConnectorConfig connConfig, Map<String, Object> producerProps) {
+        String topic = connConfig.getString(PREFIX + "." + DLQ_TOPIC_NAME);
+
+        try (AdminClient admin = AdminClient.create(workerConfig.originals())) {
+            if (!admin.listTopics().names().get().contains(topic)) {
+                log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
+                NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, DLQ_MAX_DESIRED_REPLICATION_FACTOR);
+                admin.createTopics(singleton(schemaTopicRequest)).all().get();
+            }
+        } catch (InterruptedException e) {
+            throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e);
+        } catch (ExecutionException e) {
+            if (!(e.getCause() instanceof TopicExistsException)) {
+                throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e);
+            }
+        }
+
+        KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
+        return new DeadLetterQueueReporter(dlqProducer);
+    }
+
+    /**
+     * Initialize the dead letter queue reporter with a {@link KafkaProducer}.
+     *
+     * @param kafkaProducer a Kafka Producer to produce the original consumed records.
+     */
+    // Visible for testing
+    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer) {
+        this.kafkaProducer = kafkaProducer;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        config = new DeadLetterQueueReporterConfig(configs);
+    }
+
+    @Override
+    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
+        this.errorHandlingMetrics = errorHandlingMetrics;
+    }
+
+    /**
+     * Write the raw records into a Kafka topic.
+     *
+     * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}.
+     */
+    public void report(ProcessingContext context) {
+        if (config.topic().isEmpty()) {
+            return;
+        }
+
+        errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
+
+        ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord();
+        if (originalMessage == null) {
+            errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            return;
+        }
+
+        ProducerRecord<byte[], byte[]> producerRecord;
+        if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
+            producerRecord = new ProducerRecord<>(config.topic(), null,
+                    originalMessage.key(), originalMessage.value(), originalMessage.headers());
+        } else {
+            producerRecord = new ProducerRecord<>(config.topic(), null, originalMessage.timestamp(),
+                    originalMessage.key(), originalMessage.value(), originalMessage.headers());
+        }
+
+        this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
+            if (exception != null) {
+                log.error("Could not produce message to dead letter queue. topic=" + config.topic(), exception);
+                errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            }
+        });
+    }
+
+    static class DeadLetterQueueReporterConfig extends AbstractConfig {
+        public DeadLetterQueueReporterConfig(Map<?, ?> originals) {
+            super(CONFIG_DEF, originals, true);
+        }
+
+        /**
+         * @return name of the dead letter queue topic.
+         */
+        public String topic() {
+            return getString(DLQ_TOPIC_NAME);
+        }
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
new file mode 100644
index 0000000..c589012
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.ConnectMetrics;
+import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.ArrayList;
+
+/**
+ * Contains various sensors used for monitoring errors.
+ */
+public class ErrorHandlingMetrics {
+
+    private final Time time = new SystemTime();
+
+    private final ConnectMetrics.MetricGroup metricGroup;
+
+    // metrics
+    private final Sensor recordProcessingFailures;
+    private final Sensor recordProcessingErrors;
+    private final Sensor recordsSkipped;
+    private final Sensor retries;
+    private final Sensor errorsLogged;
+    private final Sensor dlqProduceRequests;
+    private final Sensor dlqProduceFailures;
+    private long lastErrorTime = 0;
+
+    // for testing only
+    public ErrorHandlingMetrics() {
+        this(new ConnectorTaskId("noop-connector", -1),
+                new ConnectMetrics("noop-worker", new SystemTime(), 2, 3000, Sensor.RecordingLevel.INFO.toString(),
+                        new ArrayList<>()));
+    }
+
+    public ErrorHandlingMetrics(ConnectorTaskId id, ConnectMetrics connectMetrics) {
+
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.taskErrorHandlingGroupName(),
+                registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task()));
+
+        // prevent collisions by removing any previously created metrics in this group.
+        metricGroup.close();
+
+        recordProcessingFailures = metricGroup.sensor("total-record-failures");
+        recordProcessingFailures.add(metricGroup.metricName(registry.recordProcessingFailures), new Total());
+
+        recordProcessingErrors = metricGroup.sensor("total-record-errors");
+        recordProcessingErrors.add(metricGroup.metricName(registry.recordProcessingErrors), new Total());
+
+        recordsSkipped = metricGroup.sensor("total-records-skipped");
+        recordsSkipped.add(metricGroup.metricName(registry.recordsSkipped), new Total());
+
+        retries = metricGroup.sensor("total-retries");
+        retries.add(metricGroup.metricName(registry.retries), new Total());
+
+        errorsLogged = metricGroup.sensor("total-errors-logged");
+        errorsLogged.add(metricGroup.metricName(registry.errorsLogged), new Total());
+
+        dlqProduceRequests = metricGroup.sensor("deadletterqueue-produce-requests");
+        dlqProduceRequests.add(metricGroup.metricName(registry.dlqProduceRequests), new Total());
+
+        dlqProduceFailures = metricGroup.sensor("deadletterqueue-produce-failures");
+        dlqProduceFailures.add(metricGroup.metricName(registry.dlqProduceFailures), new Total());
+
+        metricGroup.addValueMetric(registry.lastErrorTimestamp, now -> lastErrorTime);
+    }
+
+    /**
+     * Increment the number of failed operations (retriable and non-retriable).
+     */
+    public void recordFailure() {
+        recordProcessingFailures.record();
+    }
+
+    /**
+     * Increment the number of operations which could not be successfully executed.
+     */
+    public void recordError() {
+        recordProcessingErrors.record();
+    }
+
+    /**
+     * Increment the number of records skipped.
+     */
+    public void recordSkipped() {
+        recordsSkipped.record();
+    }
+
+    /**
+     * The number of retries made while executing operations.
+     */
+    public void recordRetry() {
+        retries.record();
+    }
+
+    /**
+     * The number of errors logged by the {@link LogReporter}.
+     */
+    public void recordErrorLogged() {
+        errorsLogged.record();
+    }
+
+    /**
+     * The number of produce requests to the {@link DeadLetterQueueReporter}.
+     */
+    public void recordDeadLetterQueueProduceRequest() {
+        dlqProduceRequests.record();
+    }
+
+    /**
+     * The number of produce requests to the {@link DeadLetterQueueReporter} which failed to be successfully produced into Kafka.
+     */
+    public void recordDeadLetterQueueProduceFailed() {
+        dlqProduceFailures.record();
+    }
+
+    /**
+     * Record the time of error.
+     */
+    public void recordErrorTimestamp() {
+        this.lastErrorTime = time.milliseconds();
+    }
+
+    /**
+     * @return the metric group for this class.
+     */
+    public ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
new file mode 100644
index 0000000..e71b6bc
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.Configurable;
+
+/**
+ * Report an error using the information contained in the {@link ProcessingContext}.
+ */
+public interface ErrorReporter extends Configurable {
+
+    /**
+     * Report an error.
+     *
+     * @param context the processing context (cannot be null).
+     */
+    void report(ProcessingContext context);
+
+    /**
+     * Provides the container for error handling metrics to implementations. This method will be called once the error
+     * reporter object is instantiated.
+     *
+     * @param errorHandlingMetrics metrics for error handling (cannot be null).
+     */
+    void metrics(ErrorHandlingMetrics errorHandlingMetrics);
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
new file mode 100644
index 0000000..1d2c08f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Writes errors and their context to application logs.
+ */
+public class LogReporter implements ErrorReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(LogReporter.class);
+
+    public static final String PREFIX = "errors.log";
+
+    public static final String LOG_ENABLE = "enable";
+    public static final String LOG_ENABLE_DOC = "If true, log to application logs the errors and the information describing where they occurred.";
+    public static final boolean LOG_ENABLE_DEFAULT = false;
+
+    public static final String LOG_INCLUDE_MESSAGES = "include.messages";
+    public static final String LOG_INCLUDE_MESSAGES_DOC = "If true, include in the application log the Connect key, value, and other details of records that resulted in errors and failures.";
+    public static final boolean LOG_INCLUDE_MESSAGES_DEFAULT = false;
+
+    private final ConnectorTaskId id;
+
+    private LogReporterConfig config;
+    private ErrorHandlingMetrics errorHandlingMetrics;
+
+    public LogReporter(ConnectorTaskId id) {
+        this.id = id;
+    }
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+                .define(LOG_ENABLE, ConfigDef.Type.BOOLEAN, LOG_ENABLE_DEFAULT, ConfigDef.Importance.MEDIUM, LOG_ENABLE_DOC)
+                .define(LOG_INCLUDE_MESSAGES, ConfigDef.Type.BOOLEAN, LOG_INCLUDE_MESSAGES_DEFAULT, ConfigDef.Importance.MEDIUM, LOG_INCLUDE_MESSAGES_DOC);
+
+    /**
+     * Log error context.
+     *
+     * @param context the processing context.
+     */
+    @Override
+    public void report(ProcessingContext context) {
+        if (!config.isEnabled()) {
+            return;
+        }
+
+        if (!context.failed()) {
+            return;
+        }
+
+        log.error(message(context), context.error());
+        errorHandlingMetrics.recordErrorLogged();
+    }
+
+    @Override
+    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
+        this.errorHandlingMetrics = errorHandlingMetrics;
+    }
+
+    // Visible for testing
+    String message(ProcessingContext context) {
+        return String.format("Error encountered in task %s. %s", String.valueOf(id), context.toString(config.canLogMessages()));
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        config = new LogReporterConfig(configs);
+    }
+
+    private static class LogReporterConfig extends AbstractConfig {
+        public LogReporterConfig(Map<?, ?> originals) {
+            super(CONFIG_DEF, originals, true);
+        }
+
+        /**
+         * @return true, if logging of error context is desired; false otherwise.
+         */
+        public boolean isEnabled() {
+            return getBoolean(LOG_ENABLE);
+        }
+
+        /**
+         * @return if false, the connect record which caused the exception is not logged.
+         */
+        public boolean canLogMessages() {
+            return getBoolean(LOG_INCLUDE_MESSAGES);
+        }
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Operation.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Operation.java
new file mode 100644
index 0000000..3e0f792
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Operation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A recoverable operation evaluated in the connector pipeline.
+ *
+ * @param <V> return type of the result of the operation.
+ */
+public interface Operation<V> extends Callable<V> {
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
new file mode 100644
index 0000000..f826d74
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant
+ * to exist per task in a JVM.
+ */
+class ProcessingContext {
+
+    private Collection<ErrorReporter> reporters = Collections.emptyList();
+
+    private ConsumerRecord<byte[], byte[]> consumedMessage;
+    private SourceRecord sourceRecord;
+
+    /**
+     * The following fields need to be reset every time a new record is seen.
+     */
+
+    private Stage position;
+    private Class<?> klass;
+    private int attempt;
+    private Throwable error;
+
+    /**
+     * Reset the internal fields before executing operations on a new record.
+     */
+    private void reset() {
+        attempt = 0;
+        position = null;
+        klass = null;
+        error = null;
+    }
+
+    /**
+     * Set the record consumed from Kafka in a sink connector.
+     *
+     * @param consumedMessage the record
+     */
+    public void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) {
+        this.consumedMessage = consumedMessage;
+        reset();
+    }
+
+    /**
+     * @return the record consumed from Kafka. could be null
+     */
+    public ConsumerRecord<byte[], byte[]> consumerRecord() {
+        return consumedMessage;
+    }
+
+    /**
+     * @return the source record being processed.
+     */
+    public SourceRecord sourceRecord() {
+        return sourceRecord;
+    }
+
+    /**
+     * Set the source record being processed in the connect pipeline.
+     *
+     * @param record the source record
+     */
+    public void sourceRecord(SourceRecord record) {
+        this.sourceRecord = record;
+        reset();
+    }
+
+    /**
+     * Set the stage in the connector pipeline which is currently executing.
+     *
+     * @param position the stage
+     */
+    public void position(Stage position) {
+        this.position = position;
+    }
+
+    /**
+     * @return the stage in the connector pipeline which is currently executing.
+     */
+    public Stage stage() {
+        return position;
+    }
+
+    /**
+     * @return the class which is going to execute the current operation.
+     */
+    public Class<?> executingClass() {
+        return klass;
+    }
+
+    /**
+     * @param klass set the class which is currently executing.
+     */
+    public void executingClass(Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * A helper method to set both the stage and the class.
+     *
+     * @param stage the stage
+     * @param klass the class which will execute the operation in this stage.
+     */
+    public void currentContext(Stage stage, Class<?> klass) {
+        position(stage);
+        executingClass(klass);
+    }
+
+    /**
+     * Report errors. Should be called only if an error was encountered while executing the operation.
+     */
+    public void report() {
+        for (ErrorReporter reporter: reporters) {
+            reporter.report(this);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return toString(false);
+    }
+
+    public String toString(boolean includeMessage) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("Executing stage '");
+        builder.append(stage().name());
+        builder.append("' with class '");
+        builder.append(executingClass() == null ? "null" : executingClass().getName());
+        builder.append('\'');
+        if (includeMessage && sourceRecord() != null) {
+            builder.append(", where source record is = ");
+            builder.append(sourceRecord());
+        } else if (includeMessage && consumerRecord() != null) {
+            ConsumerRecord<byte[], byte[]> msg = consumerRecord();
+            builder.append(", where consumed record is ");
+            builder.append("{topic='").append(msg.topic()).append('\'');
+            builder.append(", partition=").append(msg.partition());
+            builder.append(", offset=").append(msg.offset());
+            if (msg.timestampType() == TimestampType.CREATE_TIME || msg.timestampType() == TimestampType.LOG_APPEND_TIME) {
+                builder.append(", timestamp=").append(msg.timestamp());
+                builder.append(", timestampType=").append(msg.timestampType());
+            }
+            builder.append("}");
+        }
+        builder.append('.');
+        return builder.toString();
+    }
+
+    /**
+     * @param attempt the number of attempts made to execute the current operation.
+     */
+    public void attempt(int attempt) {
+        this.attempt = attempt;
+    }
+
+    /**
+     * @return the number of attempts made to execute the current operation.
+     */
+    public int attempt() {
+        return attempt;
+    }
+
+    /**
+     * @return the error (if any) which was encountered while processing the current stage.
+     */
+    public Throwable error() {
+        return error;
+    }
+
+    /**
+     * The error (if any) which was encountered while processing the current stage.
+     *
+     * @param error the error
+     */
+    public void error(Throwable error) {
+        this.error = error;
+    }
+
+    /**
+     * @return true, if the last operation encountered an error; false otherwise
+     */
+    public boolean failed() {
+        return error() != null;
+    }
+
+    /**
+     * Set the error reporters for this connector.
+     *
+     * @param reporters the error reporters (should not be null).
+     */
+    public void reporters(Collection<ErrorReporter> reporters) {
+        Objects.requireNonNull(reporters);
+        this.reporters = reporters;
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
new file mode 100644
index 0000000..941abf3
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+/**
+ * Attempt to recover a failed operation with retries and tolerance limits.
+ * <p>
+ *
+ * A retry is attempted if the operation throws a {@link RetriableException}. Retries are accompanied by exponential backoffs, starting with
+ * {@link #RETRIES_DELAY_MIN_MS}, up to what is specified with {@link RetryWithToleranceOperatorConfig#retryDelayMax()}.
+ * Including the first attempt and future retries, the total time taken to evaluate the operation should be within
+ * {@link RetryWithToleranceOperatorConfig#retryDelayMax()} millis.
+ * <p>
+ *
+ * This executor will tolerate failures, as specified by {@link RetryWithToleranceOperatorConfig#toleranceLimit()}.
+ * For transformations and converters, all exceptions are tolerated. For others operations, only {@link RetriableException} are tolerated.
+ * <p>
+ *
+ * There are three outcomes to executing an operation. It might succeed, in which case the result is returned to the caller.
+ * If it fails, this class does one of these two things: (1) if the failure occurred due to a tolerable exception, then
+ * set appropriate error reason in the {@link ProcessingContext} and return null, or (2) if the exception is not tolerated,
+ * then it is wrapped into a ConnectException and rethrown to the caller.
+ * <p>
+ */
+public class RetryWithToleranceOperator {
+
+    private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
+
+    public static final String RETRY_TIMEOUT = "retry.timeout";
+    public static final String RETRY_TIMEOUT_DOC = "The total duration in milliseconds a failed operation will be retried for.";
+    public static final long RETRY_TIMEOUT_DEFAULT = 0;
+
+    public static final String RETRY_DELAY_MAX_MS = "retry.delay.max.ms";
+    public static final String RETRY_DELAY_MAX_MS_DOC = "The maximum duration between two consecutive retries (in milliseconds).";
+    public static final long RETRY_DELAY_MAX_MS_DEFAULT = 60000;
+
+    public static final long RETRIES_DELAY_MIN_MS = 300;
+
+    public static final String TOLERANCE_LIMIT = "allowed.max";
+    public static final String TOLERANCE_LIMIT_DOC = "Fail the task if we exceed specified number of errors overall.";
+    public static final String TOLERANCE_LIMIT_DEFAULT = "none";
+
+    // for testing only
+    public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator();
+    static {
+        NOOP_OPERATOR.configure(Collections.emptyMap());
+        NOOP_OPERATOR.metrics(new ErrorHandlingMetrics());
+    }
+
+    private static final Map<Stage, Class<? extends Exception>> TOLERABLE_EXCEPTIONS = new HashMap<>();
+    static {
+        TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class);
+        TOLERABLE_EXCEPTIONS.put(Stage.HEADER_CONVERTER, Exception.class);
+        TOLERABLE_EXCEPTIONS.put(Stage.KEY_CONVERTER, Exception.class);
+        TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class);
+    }
+
+    private long totalFailures = 0;
+    private final Time time;
+    private RetryWithToleranceOperatorConfig config;
+    private ErrorHandlingMetrics errorHandlingMetrics;
+
+    protected ProcessingContext context = new ProcessingContext();
+
+    public RetryWithToleranceOperator() {
+        this(new SystemTime());
+    }
+
+    // Visible for testing
+    public RetryWithToleranceOperator(Time time) {
+        this.time = time;
+    }
+
+    static ConfigDef getConfigDef() {
+        return new ConfigDef()
+                .define(RETRY_TIMEOUT, ConfigDef.Type.LONG, RETRY_TIMEOUT_DEFAULT, ConfigDef.Importance.HIGH, RETRY_TIMEOUT_DOC)
+                .define(RETRY_DELAY_MAX_MS, ConfigDef.Type.LONG, RETRY_DELAY_MAX_MS_DEFAULT, atLeast(1), ConfigDef.Importance.MEDIUM, RETRY_DELAY_MAX_MS_DOC)
+                .define(TOLERANCE_LIMIT, ConfigDef.Type.STRING, TOLERANCE_LIMIT_DEFAULT, in("none", "all"), ConfigDef.Importance.HIGH, TOLERANCE_LIMIT_DOC);
+    }
+
+    /**
+     * Execute the recoverable operation. If the operation is already in a failed state, then simply return
+     * with the existing failure.
+     *
+     * @param operation the recoverable operation
+     * @param <V> return type of the result of the operation.
+     * @return result of the operation
+     */
+    public <V> V execute(Operation<V> operation, Stage stage, Class<?> executingClass) {
+        context.currentContext(stage, executingClass);
+
+        if (context.failed()) {
+            log.debug("ProcessingContext is already in failed state. Ignoring requested operation.");
+            return null;
+        }
+
+        try {
+            Class<? extends Exception> ex = TOLERABLE_EXCEPTIONS.getOrDefault(context.stage(), RetriableException.class);
+            return execAndHandleError(operation, ex);
+        } finally {
+            if (context.failed()) {
+                errorHandlingMetrics.recordError();
+                context.report();
+            }
+        }
+    }
+
+    /**
+     * Attempt to execute an operation. Retry if a {@link RetriableException} is raised. Re-throw everything else.
+     *
+     * @param operation the operation to be executed.
+     * @param <V> the return type of the result of the operation.
+     * @return the result of the operation.
+     * @throws Exception rethrow if a non-retriable Exception is thrown by the operation
+     */
+    protected <V> V execAndRetry(Operation<V> operation) throws Exception {
+        int attempt = 0;
+        long startTime = time.milliseconds();
+        long deadline = startTime + config.retryTimeout();
+        do {
+            try {
+                attempt++;
+                return operation.call();
+            } catch (RetriableException e) {
+                log.trace("Caught a retriable exception while executing {} operation with {}", context.stage(), context.executingClass());
+                errorHandlingMetrics.recordFailure();
+                if (checkRetry(startTime)) {
+                    backoff(attempt, deadline);
+                    if (Thread.currentThread().isInterrupted()) {
+                        log.trace("Thread was interrupted. Marking operation as failed.");
+                        context.error(e);
+                        return null;
+                    }
+                    errorHandlingMetrics.recordRetry();
+                } else {
+                    log.trace("Can't retry. start={}, attempt={}, deadline={}", startTime, attempt, deadline);
+                    context.error(e);
+                    return null;
+                }
+            } finally {
+                context.attempt(attempt);
+            }
+        } while (true);
+    }
+
+    /**
+     * Execute a given operation multiple times (if needed), and tolerate certain exceptions.
+     *
+     * @param operation the operation to be executed.
+     * @param tolerated the class of exceptions which can be tolerated.
+     * @param <V> The return type of the result of the operation.
+     * @return the result of the operation
+     */
+    // Visible for testing
+    protected <V> V execAndHandleError(Operation<V> operation, Class<? extends Exception> tolerated) {
+        try {
+            V result = execAndRetry(operation);
+            if (context.failed()) {
+                markAsFailed();
+                errorHandlingMetrics.recordSkipped();
+            }
+            return result;
+        } catch (Exception e) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            context.error(e);
+
+            if (!tolerated.isAssignableFrom(e.getClass())) {
+                throw new ConnectException("Unhandled exception in error handler", e);
+            }
+
+            if (!withinToleranceLimits()) {
+                throw new ConnectException("Tolerance exceeded in error handler", e);
+            }
+
+            errorHandlingMetrics.recordSkipped();
+            return null;
+        }
+    }
+
+    // Visible for testing
+    void markAsFailed() {
+        errorHandlingMetrics.recordErrorTimestamp();
+        totalFailures++;
+    }
+
+    // Visible for testing
+    boolean withinToleranceLimits() {
+        switch (config.toleranceLimit()) {
+            case NONE:
+                if (totalFailures > 0) return false;
+            case ALL:
+                return true;
+            default:
+                throw new ConfigException("Unknown tolerance type: {}", config.toleranceLimit());
+        }
+    }
+
+    // Visible for testing
+    boolean checkRetry(long startTime) {
+        return (time.milliseconds() - startTime) < config.retryTimeout();
+    }
+
+    // Visible for testing
+    void backoff(int attempt, long deadline) {
+        int numRetry = attempt - 1;
+        long delay = RETRIES_DELAY_MIN_MS << numRetry;
+        if (delay > config.retryDelayMax()) {
+            delay = ThreadLocalRandom.current().nextLong(config.retryDelayMax());
+        }
+        if (delay + time.milliseconds() > deadline) {
+            delay = deadline - time.milliseconds();
+        }
+        log.debug("Sleeping for {} millis", delay);
+        time.sleep(delay);
+    }
+
+    public void configure(Map<String, ?> configs) {
+        config = new RetryWithToleranceOperatorConfig(configs);
+    }
+
+    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
+        this.errorHandlingMetrics = errorHandlingMetrics;
+    }
+
+    static class RetryWithToleranceOperatorConfig extends AbstractConfig {
+        public RetryWithToleranceOperatorConfig(Map<?, ?> originals) {
+            super(getConfigDef(), originals, true);
+        }
+
+        /**
+         * @return the total time an operation can take to succeed (including the first attempt and retries).
+         */
+        public long retryTimeout() {
+            return getLong(RETRY_TIMEOUT);
+        }
+
+        /**
+         * @return the maximum delay between two subsequent retries in milliseconds.
+         */
+        public long retryDelayMax() {
+            return getLong(RETRY_DELAY_MAX_MS);
+        }
+
+        /**
+         * @return determine how many errors to tolerate.
+         */
+        public ToleranceType toleranceLimit() {
+            return ToleranceType.fromString(getString(TOLERANCE_LIMIT));
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "RetryWithToleranceOperator{" +
+                "config=" + config +
+                '}';
+    }
+
+    /**
+     * Set the error reporters for this connector.
+     *
+     * @param reporters the error reporters (should not be null).
+     */
+    public void reporters(List<ErrorReporter> reporters) {
+        this.context.reporters(reporters);
+    }
+
+    /**
+     * Set the source record being processed in the connect pipeline.
+     *
+     * @param preTransformRecord the source record
+     */
+    public void sourceRecord(SourceRecord preTransformRecord) {
+        this.context.sourceRecord(preTransformRecord);
+    }
+
+    /**
+     * Set the record consumed from Kafka in a sink connector.
+     *
+     * @param consumedMessage the record
+     */
+    public void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) {
+        this.context.consumerRecord(consumedMessage);
+    }
+
+    /**
+     * @return true, if the last operation encountered an error; false otherwise
+     */
+    public boolean failed() {
+        return this.context.failed();
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java
new file mode 100644
index 0000000..b9aa1f2
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+/**
+ * A logical stage in a Connect pipeline.
+ */
+public enum Stage {
+
+    /**
+     * When calling the poll() method on a SourceConnector
+     */
+    TASK_POLL,
+
+    /**
+     * When calling the put() method on a SinkConnector
+     */
+    TASK_PUT,
+
+    /**
+     * When running any transformation operation on a record
+     */
+    TRANSFORMATION,
+
+    /**
+     * When using the key converter to serialize/deserialize keys in ConnectRecords
+     */
+    KEY_CONVERTER,
+
+    /**
+     * When using the value converter to serialize/deserialize values in ConnectRecords
+     */
+    VALUE_CONVERTER,
+
+    /**
+     * When using the header converter to serialize/deserialize headers in ConnectRecords
+     */
+    HEADER_CONVERTER,
+
+    /**
+     * When producing to Kafka topic
+     */
+    KAFKA_PRODUCE,
+
+    /**
+     * When consuming from a Kafka topic
+     */
+    KAFKA_CONSUME
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java
new file mode 100644
index 0000000..79956ac
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import java.util.Locale;
+
+/**
+ * The different levels of error tolerance.
+ */
+public enum ToleranceType {
+
+    /**
+     * Tolerate no errors.
+     */
+    NONE,
+
+    /**
+     * Tolerate all errors.
+     */
+    ALL;
+
+    public static ToleranceType fromString(String typeStr) {
+        return "ALL".equals(typeStr.toUpperCase(Locale.ROOT)) ? ToleranceType.ALL : ToleranceType.NONE;
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
new file mode 100644
index 0000000..5a8bcc5
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.LogReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class})
+@PowerMockIgnore("javax.management.*")
+public class ErrorHandlingTaskTest {
+
+    private static final String TOPIC = "test";
+    private static final int PARTITION1 = 12;
+    private static final int PARTITION2 = 13;
+    private static final long FIRST_OFFSET = 45;
+
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+
+    static {
+        TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
+    }
+
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
+    private static final Map<String, String> OPERATION_EXECUTOR_PROPS = new HashMap<>();
+
+    static {
+        OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
+        // wait up to 1 minute for an operation
+        OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.RETRY_TIMEOUT, "60000");
+        // wait up 5 seconds between subsequent retries
+        OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "5000");
+    }
+
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private TargetState initialState = TargetState.STARTED;
+    private Time time;
+    private MockConnectMetrics metrics;
+    @SuppressWarnings("unused")
+    @Mock
+    private SinkTask sinkTask;
+    @SuppressWarnings("unused")
+    @Mock
+    private SourceTask sourceTask;
+    private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
+    private WorkerConfig workerConfig;
+    @Mock
+    private PluginClassLoader pluginLoader;
+    @SuppressWarnings("unused")
+    @Mock
+    private HeaderConverter headerConverter;
+    private WorkerSinkTask workerSinkTask;
+    private WorkerSourceTask workerSourceTask;
+    @SuppressWarnings("unused")
+    @Mock
+    private KafkaConsumer<byte[], byte[]> consumer;
+    @SuppressWarnings("unused")
+    @Mock
+    private KafkaProducer<byte[], byte[]> producer;
+
+    @Mock
+    OffsetStorageReader offsetReader;
+    @Mock
+    OffsetStorageWriter offsetWriter;
+
+    private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+    @SuppressWarnings("unused")
+    @Mock
+    private TaskStatus.Listener statusListener;
+
+    private ErrorHandlingMetrics errorHandlingMetrics;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, 0, 0);
+        metrics = new MockConnectMetrics();
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+        workerConfig = new StandaloneConfig(workerProps);
+        errorHandlingMetrics = new ErrorHandlingMetrics(taskId, metrics);
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) {
+            metrics.stop();
+        }
+    }
+
+    @Test
+    public void testErrorHandlingInSinkTasks() throws Exception {
+        LogReporter reporter = new LogReporter(taskId);
+        Map<String, Object> reportProps = new HashMap<>();
+        reportProps.put(LogReporter.LOG_ENABLE, "true");
+        reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true");
+        reporter.configure(reportProps);
+        reporter.metrics(errorHandlingMetrics);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time);
+        retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+        createSinkTask(initialState, retryWithToleranceOperator);
+
+        expectInitializeTask();
+
+        // valid json
+        ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(TOPIC, PARTITION1, FIRST_OFFSET, null, "{\"a\": 10}".getBytes());
+        // bad json
+        ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC, PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
+
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record1));
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record2));
+
+        sinkTask.put(EasyMock.anyObject());
+        EasyMock.expectLastCall().times(2);
+
+        PowerMock.replayAll();
+
+        workerSinkTask.initialize(TASK_CONFIG);
+        workerSinkTask.initializeAndStart();
+        workerSinkTask.iteration();
+
+        workerSinkTask.iteration();
+
+        // two records were consumed from Kafka
+        assertSinkMetricValue("sink-record-read-total", 2.0);
+        // only one was written to the task
+        assertSinkMetricValue("sink-record-send-total", 1.0);
+        // one record completely failed (converter issues)
+        assertErrorHandlingMetricValue("total-record-errors", 1.0);
+        // 2 failures in the transformation, and 1 in the converter
+        assertErrorHandlingMetricValue("total-record-failures", 3.0);
+        // one record completely failed (converter issues), and thus was skipped
+        assertErrorHandlingMetricValue("total-records-skipped", 1.0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testErrorHandlingInSourceTasks() throws Exception {
+        LogReporter reporter = new LogReporter(taskId);
+        Map<String, Object> reportProps = new HashMap<>();
+        reportProps.put(LogReporter.LOG_ENABLE, "true");
+        reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true");
+        reporter.configure(reportProps);
+        reporter.metrics(errorHandlingMetrics);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time);
+        retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+        createSourceTask(initialState, retryWithToleranceOperator);
+
+        // valid json
+        Schema valSchema = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build();
+        Struct struct1 = new Struct(valSchema).put("val", 1234);
+        SourceRecord record1 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct1);
+        Struct struct2 = new Struct(valSchema).put("val", 6789);
+        SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2);
+
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+
+        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+
+        offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject());
+        EasyMock.expectLastCall().times(2);
+        sourceTask.initialize(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        sourceTask.start(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
+        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
+        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null).times(2);
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.execute();
+
+        // two records were consumed from Kafka
+        assertSourceMetricValue("source-record-poll-total", 2.0);
+        // only one was written to the task
+        assertSourceMetricValue("source-record-write-total", 0.0);
+        // one record completely failed (converter issues)
+        assertErrorHandlingMetricValue("total-record-errors", 0.0);
+        // 2 failures in the transformation, and 1 in the converter
+        assertErrorHandlingMetricValue("total-record-failures", 4.0);
+        // one record completely failed (converter issues), and thus was skipped
+        assertErrorHandlingMetricValue("total-records-skipped", 0.0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
+        LogReporter reporter = new LogReporter(taskId);
+        Map<String, Object> reportProps = new HashMap<>();
+        reportProps.put(LogReporter.LOG_ENABLE, "true");
+        reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true");
+        reporter.configure(reportProps);
+        reporter.metrics(errorHandlingMetrics);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time);
+        retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+        createSourceTask(initialState, retryWithToleranceOperator, badConverter());
+
+        // valid json
+        Schema valSchema = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build();
+        Struct struct1 = new Struct(valSchema).put("val", 1234);
+        SourceRecord record1 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct1);
+        Struct struct2 = new Struct(valSchema).put("val", 6789);
+        SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2);
+
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+
+        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+
+        offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject());
+        EasyMock.expectLastCall().times(2);
+        sourceTask.initialize(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        sourceTask.start(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
+        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
+        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null).times(2);
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.execute();
+
+        // two records were consumed from Kafka
+        assertSourceMetricValue("source-record-poll-total", 2.0);
+        // only one was written to the task
+        assertSourceMetricValue("source-record-write-total", 0.0);
+        // one record completely failed (converter issues)
+        assertErrorHandlingMetricValue("total-record-errors", 0.0);
+        // 2 failures in the transformation, and 1 in the converter
+        assertErrorHandlingMetricValue("total-record-failures", 8.0);
+        // one record completely failed (converter issues), and thus was skipped
+        assertErrorHandlingMetricValue("total-records-skipped", 0.0);
+
+        PowerMock.verifyAll();
+    }
+
+    private void assertSinkMetricValue(String name, double expected) {
+        ConnectMetrics.MetricGroup sinkTaskGroup = workerSinkTask.sinkTaskMetricsGroup().metricGroup();
+        double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+    private void assertSourceMetricValue(String name, double expected) {
+        ConnectMetrics.MetricGroup sinkTaskGroup = workerSourceTask.sourceTaskMetricsGroup().metricGroup();
+        double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+    private void assertErrorHandlingMetricValue(String name, double expected) {
+        ConnectMetrics.MetricGroup sinkTaskGroup = errorHandlingMetrics.metricGroup();
+        double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+    private void expectInitializeTask() throws Exception {
+        PowerMock.expectPrivate(workerSinkTask, "createConsumer").andReturn(consumer);
+        consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), EasyMock.capture(rebalanceListener));
+        PowerMock.expectLastCall();
+
+        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
+        PowerMock.expectLastCall();
+        sinkTask.start(TASK_PROPS);
+        PowerMock.expectLastCall();
+    }
+
+    private void createSinkTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
+        JsonConverter converter = new JsonConverter();
+        Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter.");
+        oo.put("converter.type", "value");
+        oo.put("schemas.enable", "false");
+        converter.configure(oo);
+
+        TransformationChain<SinkRecord> sinkTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
+
+        workerSinkTask = PowerMock.createPartialMock(
+                WorkerSinkTask.class, new String[]{"createConsumer"},
+                taskId, sinkTask, statusListener, initialState, workerConfig, metrics, converter, converter,
+                headerConverter, sinkTransforms, pluginLoader, time, retryWithToleranceOperator);
+    }
+
+    private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
+        JsonConverter converter = new JsonConverter();
+        Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter.");
+        oo.put("converter.type", "value");
+        oo.put("schemas.enable", "false");
+        converter.configure(oo);
+
+        createSourceTask(initialState, retryWithToleranceOperator, converter);
+    }
+
+    private Converter badConverter() {
+        FaultyConverter converter = new FaultyConverter();
+        Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter.");
+        oo.put("converter.type", "value");
+        oo.put("schemas.enable", "false");
+        converter.configure(oo);
+        return converter;
+    }
+
+    private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
+        TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), retryWithToleranceOperator);
+
+        workerSourceTask = PowerMock.createPartialMock(
+                WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"},
+                taskId, sourceTask, statusListener, initialState, converter, converter, headerConverter, sourceTransforms,
+                producer, offsetReader, offsetWriter, workerConfig, metrics, pluginLoader, time, retryWithToleranceOperator);
+    }
+
+    private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) {
+        return new ConsumerRecords<>(Collections.singletonMap(
+                new TopicPartition(record.topic(), record.partition()), singletonList(record)));
+    }
+
+    private abstract static class TestSinkTask extends SinkTask {
+    }
+
+    static class FaultyConverter extends JsonConverter {
+        private static final Logger log = LoggerFactory.getLogger(FaultyConverter.class);
+        private int invocations = 0;
+
+        public byte[] fromConnectData(String topic, Schema schema, Object value) {
+            if (value == null) {
+                return super.fromConnectData(topic, schema, null);
+            }
+            invocations++;
+            if (invocations % 3 == 0) {
+                log.debug("Succeeding record: {} where invocations={}", value, invocations);
+                return super.fromConnectData(topic, schema, value);
+            } else {
+                log.debug("Failing record: {} at invocations={}", value, invocations);
+                throw new RetriableException("Bad invocations " + invocations + " for mod 3");
+            }
+        }
+    }
+
+    static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R> {
+
+        private static final Logger log = LoggerFactory.getLogger(FaultyPassthrough.class);
+
+        private static final String MOD_CONFIG = "mod";
+        private static final int MOD_CONFIG_DEFAULT = 3;
+
+        public static final ConfigDef CONFIG_DEF = new ConfigDef()
+                .define(MOD_CONFIG, ConfigDef.Type.INT, MOD_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, "Pass records without failure only if timestamp % mod == 0");
+
+        private int mod = MOD_CONFIG_DEFAULT;
+
+        private int invocations = 0;
+
+        @Override
+        public R apply(R record) {
+            invocations++;
+            if (invocations % mod == 0) {
+                log.debug("Succeeding record: {} where invocations={}", record, invocations);
+                return record;
+            } else {
+                log.debug("Failing record: {} at invocations={}", record, invocations);
+                throw new RetriableException("Bad invocations " + invocations + " for mod " + mod);
+            }
+        }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
+
+        @Override
+        public void close() {
+            log.info("Shutting down transform");
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
+            mod = Math.max(config.getInt(MOD_CONFIG), 2);
+            log.info("Configuring {}. Setting mod to {}", this.getClass(), mod);
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 9568e78..ff8507c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -27,20 +27,21 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.common.utils.MockTime;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
@@ -161,7 +162,10 @@ public class WorkerSinkTaskTest {
     private void createTask(TargetState initialState) {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, valueConverter, headerConverter, transformationChain, pluginLoader, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, metrics,
+                keyConverter, valueConverter, headerConverter,
+                transformationChain, pluginLoader, time,
+                RetryWithToleranceOperator.NOOP_OPERATOR);
     }
 
     @After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index b09f847..61d8778 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -137,7 +138,9 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
                 taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter,
-                valueConverter, headerConverter, TransformationChain.noOp(), pluginLoader, time);
+                valueConverter, headerConverter,
+                new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR),
+                pluginLoader, time, RetryWithToleranceOperator.NOOP_OPERATOR);
 
         recordsReturned = 0;
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index ca26e4e..77f4ad9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -147,7 +148,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
-                transformationChain, producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM);
+                transformationChain, producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM,
+                RetryWithToleranceOperator.NOOP_OPERATOR);
     }
 
     @Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 78f2836..de0ba8a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.common.utils.MockTime;
@@ -56,10 +57,12 @@ public class WorkerTaskTest {
     private ConnectMetrics metrics;
     @Mock private TaskStatus.Listener statusListener;
     @Mock private ClassLoader loader;
+    RetryWithToleranceOperator retryWithToleranceOperator;
 
     @Before
     public void setup() {
         metrics = new MockConnectMetrics();
+        retryWithToleranceOperator = new RetryWithToleranceOperator();
     }
 
     @After
@@ -77,9 +80,10 @@ public class WorkerTaskTest {
                         TaskStatus.Listener.class,
                         TargetState.class,
                         ClassLoader.class,
-                        ConnectMetrics.class
+                        ConnectMetrics.class,
+                        RetryWithToleranceOperator.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -123,9 +127,10 @@ public class WorkerTaskTest {
                         TaskStatus.Listener.class,
                         TargetState.class,
                         ClassLoader.class,
-                        ConnectMetrics.class
+                        ConnectMetrics.class,
+                        RetryWithToleranceOperator.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -162,9 +167,10 @@ public class WorkerTaskTest {
                         TaskStatus.Listener.class,
                         TargetState.class,
                         ClassLoader.class,
-                        ConnectMetrics.class
+                        ConnectMetrics.class,
+                        RetryWithToleranceOperator.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f062436..d29eef5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -33,6 +34,7 @@ import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -47,7 +49,6 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.util.ThreadedTest;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -485,14 +486,15 @@ public class WorkerTest extends ThreadedTest {
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
-                EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
+                EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR)),
                 anyObject(KafkaProducer.class),
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 EasyMock.eq(config),
                 anyObject(ConnectMetrics.class),
                 anyObject(ClassLoader.class),
-                anyObject(Time.class))
+                anyObject(Time.class),
+                anyObject(RetryWithToleranceOperator.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
@@ -622,14 +624,15 @@ public class WorkerTest extends ThreadedTest {
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
-                EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
+                EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR)),
                 anyObject(KafkaProducer.class),
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 anyObject(WorkerConfig.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                anyObject(Time.class))
+                anyObject(Time.class),
+                anyObject(RetryWithToleranceOperator.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
@@ -713,14 +716,15 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.capture(keyConverter),
                 EasyMock.capture(valueConverter),
                 EasyMock.capture(headerConverter),
-                EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
+                EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR)),
                 anyObject(KafkaProducer.class),
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 anyObject(WorkerConfig.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                anyObject(Time.class))
+                anyObject(Time.class),
+                anyObject(RetryWithToleranceOperator.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
new file mode 100644
index 0000000..f6a0507
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.ConnectMetrics;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class ErrorReporterTest {
+
+    private static final String TOPIC = "test-topic";
+    private static final String DLQ_TOPIC = "test-topic-errors";
+    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
+
+    @Mock
+    KafkaProducer<byte[], byte[]> producer;
+
+    @Mock
+    Future<RecordMetadata> metadata;
+
+    private HashMap<String, Object> config;
+    private ErrorHandlingMetrics errorHandlingMetrics;
+    private MockConnectMetrics metrics;
+
+    @Before
+    public void setup() {
+        config = new HashMap<>();
+        metrics = new MockConnectMetrics();
+        errorHandlingMetrics = new ErrorHandlingMetrics(new ConnectorTaskId("connector-", 1), metrics);
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) {
+            metrics.stop();
+        }
+    }
+
+    @Test
+    public void testDLQConfigWithEmptyTopicName() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer);
+        deadLetterQueueReporter.configure(config);
+        deadLetterQueueReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andThrow(new RuntimeException());
+        replay(producer);
+
+        // since topic name is empty, this method should be a NOOP.
+        // if it attempts to log to the DLQ via the producer, the send mock will throw a RuntimeException.
+        deadLetterQueueReporter.report(context);
+    }
+
+    @Test
+    public void testDLQConfigWithValidTopicName() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer);
+        deadLetterQueueReporter.configure(config(DeadLetterQueueReporter.DLQ_TOPIC_NAME, DLQ_TOPIC));
+        deadLetterQueueReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata);
+        replay(producer);
+
+        deadLetterQueueReporter.report(context);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testReportDLQTwice() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer);
+        deadLetterQueueReporter.configure(config(DeadLetterQueueReporter.DLQ_TOPIC_NAME, DLQ_TOPIC));
+        deadLetterQueueReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata).times(2);
+        replay(producer);
+
+        deadLetterQueueReporter.report(context);
+        deadLetterQueueReporter.report(context);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testLogOnDisabledLogReporter() {
+        LogReporter logReporter = new LogReporter(TASK_ID);
+        logReporter.configure(config);
+        logReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+        context.error(new RuntimeException());
+
+        // reporting a context without an error should not cause any errors.
+        logReporter.report(context);
+        assertErrorHandlingMetricValue("total-errors-logged", 0.0);
+    }
+
+    @Test
+    public void testLogOnEnabledLogReporter() {
+        LogReporter logReporter = new LogReporter(TASK_ID);
+        logReporter.configure(config(LogReporter.LOG_ENABLE, "true"));
+        logReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+        context.error(new RuntimeException());
+
+        // reporting a context without an error should not cause any errors.
+        logReporter.report(context);
+        assertErrorHandlingMetricValue("total-errors-logged", 1.0);
+    }
+
+    @Test
+    public void testLogMessageWithNoRecords() {
+        LogReporter logReporter = new LogReporter(TASK_ID);
+        logReporter.configure(config(LogReporter.LOG_ENABLE, "true"));
+        logReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        String msg = logReporter.message(context);
+        assertEquals("Error encountered in task job-0. Executing stage 'KEY_CONVERTER' with class " +
+                "'org.apache.kafka.connect.json.JsonConverter'.", msg);
+    }
+
+    @Test
+    public void testLogMessageWithSinkRecords() {
+        LogReporter logReporter = new LogReporter(TASK_ID);
+        logReporter.configure(config(LogReporter.LOG_ENABLE, "true"));
+        logReporter.configure(config(LogReporter.LOG_INCLUDE_MESSAGES, "true"));
+        logReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        String msg = logReporter.message(context);
+        assertEquals("Error encountered in task job-0. Executing stage 'KEY_CONVERTER' with class " +
+                "'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test-topic', " +
+                "partition=5, offset=100}.", msg);
+    }
+
+    private ProcessingContext processingContext() {
+        ProcessingContext context = new ProcessingContext();
+        context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new byte[]{'a', 'b'}, new byte[]{'x'}));
+        context.currentContext(Stage.KEY_CONVERTER, JsonConverter.class);
+        return context;
+    }
+
+    private Map<String, Object> config(String key, Object val) {
+        config.put(key, val);
+        return config;
+    }
+
+    private void assertErrorHandlingMetricValue(String name, double expected) {
+        ConnectMetrics.MetricGroup sinkTaskGroup = errorHandlingMetrics.metricGroup();
+        double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
new file mode 100644
index 0000000..751510d
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.RetryWithToleranceOperatorConfig;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ProcessingContext.class})
+@PowerMockIgnore("javax.management.*")
+public class RetryWithToleranceOperatorTest {
+
+    @SuppressWarnings("unused")
+    @Mock
+    private Operation<String> mockOperation;
+
+    @Mock
+    ErrorHandlingMetrics errorHandlingMetrics;
+
+    @Test
+    public void testHandleExceptionInTransformations() {
+        testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInHeaderConverter() {
+        testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInValueConverter() {
+        testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInKeyConverter() {
+        testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInTaskPut() {
+        testHandleExceptionInStage(Stage.TASK_PUT, new org.apache.kafka.connect.errors.RetriableException("Test"));
+    }
+
+    @Test
+    public void testHandleExceptionInTaskPoll() {
+        testHandleExceptionInStage(Stage.TASK_POLL, new org.apache.kafka.connect.errors.RetriableException("Test"));
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testThrowExceptionInTaskPut() {
+        testHandleExceptionInStage(Stage.TASK_PUT, new Exception());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testThrowExceptionInTaskPoll() {
+        testHandleExceptionInStage(Stage.TASK_POLL, new Exception());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testThrowExceptionInKafkaConsume() {
+        testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testThrowExceptionInKafkaProduce() {
+        testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception());
+    }
+
+    private void testHandleExceptionInStage(Stage type, Exception ex) {
+        RetryWithToleranceOperator retryWithToleranceOperator = setupExecutor();
+        retryWithToleranceOperator.execute(new ExceptionThrower(ex), type, ExceptionThrower.class);
+        assertTrue(retryWithToleranceOperator.failed());
+        PowerMock.verifyAll();
+    }
+
+    private RetryWithToleranceOperator setupExecutor() {
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator();
+        Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "0");
+        props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
+        retryWithToleranceOperator.configure(props);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        return retryWithToleranceOperator;
+    }
+
+    @Test
+    public void testExecAndHandleRetriableErrorOnce() throws Exception {
+        execAndHandleRetriableError(1, 300, new RetriableException("Test"));
+    }
+
+    @Test
+    public void testExecAndHandleRetriableErrorThrice() throws Exception {
+        execAndHandleRetriableError(3, 2100, new RetriableException("Test"));
+    }
+
+    @Test
+    public void testExecAndHandleNonRetriableErrorOnce() throws Exception {
+        execAndHandleNonRetriableError(1, 0, new Exception("Non Retriable Test"));
+    }
+
+    @Test
+    public void testExecAndHandleNonRetriableErrorThrice() throws Exception {
+        execAndHandleNonRetriableError(3, 0, new Exception("Non Retriable Test"));
+    }
+
+    public void execAndHandleRetriableError(int numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time);
+        Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "6000");
+        props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
+        retryWithToleranceOperator.configure(props);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+        EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
+        EasyMock.expect(mockOperation.call()).andReturn("Success");
+
+        replay(mockOperation);
+
+        String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
+        assertFalse(retryWithToleranceOperator.failed());
+        assertEquals("Success", result);
+        assertEquals(expectedWait, time.hiResClockMs());
+
+        PowerMock.verifyAll();
+    }
+
+    public void execAndHandleNonRetriableError(int numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time);
+        Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "6000");
+        props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
+        retryWithToleranceOperator.configure(props);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+        EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
+        EasyMock.expect(mockOperation.call()).andReturn("Success");
+
+        replay(mockOperation);
+
+        String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
+        assertTrue(retryWithToleranceOperator.failed());
+        assertNull(result);
+        assertEquals(expectedWait, time.hiResClockMs());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCheckRetryLimit() {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time);
+        Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "500");
+        props.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "100");
+        retryWithToleranceOperator.configure(props);
+
+        time.setCurrentTimeMs(100);
+        assertTrue(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(200);
+        assertTrue(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(400);
+        assertTrue(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(499);
+        assertTrue(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(501);
+        assertFalse(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(600);
+        assertFalse(retryWithToleranceOperator.checkRetry(0));
+    }
+
+    @Test
+    public void testBackoffLimit() {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time);
+
+        Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "5");
+        props.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "5000");
+        retryWithToleranceOperator.configure(props);
+
+        long prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(1, 5000);
+        assertEquals(300, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(2, 5000);
+        assertEquals(600, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(3, 5000);
+        assertEquals(1200, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(4, 5000);
+        assertEquals(2400, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(5, 5000);
+        assertEquals(500, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(6, 5000);
+        assertEquals(0, time.hiResClockMs() - prevTs);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testToleranceLimit() {
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator();
+        retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT, "none"));
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.markAsFailed();
+        assertFalse("should not tolerate any errors", retryWithToleranceOperator.withinToleranceLimits());
+
+        retryWithToleranceOperator = new RetryWithToleranceOperator();
+        retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all"));
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.markAsFailed();
+        retryWithToleranceOperator.markAsFailed();
+        assertTrue("should tolerate all errors", retryWithToleranceOperator.withinToleranceLimits());
+
+        retryWithToleranceOperator = new RetryWithToleranceOperator();
+        retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT, "none"));
+        assertTrue("no tolerance is within limits if no failures", retryWithToleranceOperator.withinToleranceLimits());
+    }
+
+    @Test
+    public void testDefaultConfigs() {
+        RetryWithToleranceOperatorConfig configuration;
+        configuration = new RetryWithToleranceOperatorConfig(new HashMap<>());
+        assertEquals(configuration.retryTimeout(), 0);
+        assertEquals(configuration.retryDelayMax(), 60000);
+        assertEquals(configuration.toleranceLimit(), ToleranceType.NONE);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConfigs() {
+        RetryWithToleranceOperatorConfig configuration;
+        configuration = new RetryWithToleranceOperatorConfig(config("retry.timeout", "100"));
+        assertEquals(configuration.retryTimeout(), 100);
+
+        configuration = new RetryWithToleranceOperatorConfig(config("retry.delay.max.ms", "100"));
+        assertEquals(configuration.retryDelayMax(), 100);
+
+        configuration = new RetryWithToleranceOperatorConfig(config("allowed.max", "none"));
+        assertEquals(configuration.toleranceLimit(), ToleranceType.NONE);
+
+        PowerMock.verifyAll();
+    }
+
+    Map<String, Object> config(String key, Object val) {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(key, val);
+        return configs;
+    }
+
+    private static class ExceptionThrower implements Operation<Object> {
+        private Exception e;
+
+        public ExceptionThrower(Exception e) {
+            this.e = e;
+        }
+
+        @Override
+        public Object call() throws Exception {
+            throw e;
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.