You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/06/22 06:25:34 UTC

[rocketmq-connect] branch master updated: [ISSUE #24] sink, source retry mechanism, dead letter queue support (#148)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 0249c93  [ISSUE #24] sink, source retry mechanism, dead letter queue support (#148)
0249c93 is described below

commit 0249c9386aebf5a54ff26191877cf0722f4fd181
Author: xiaoyi <su...@163.com>
AuthorDate: Wed Jun 22 14:25:30 2022 +0800

    [ISSUE #24] sink, source retry mechanism, dead letter queue support (#148)
    
    * init standalone
    
    * fixed
    
    * complete standalone mode
    
    * test standalone mode
    
    * check code
    
    * fixed
    
    * Ensure the uniqueness of offset keys for different connectors #143
    
    * add sink、source retry mechanism, dead letter queue support #24
    
    * upgrade rocketmq connect JDBC plug-in #153
    
    * upgrade rocketmq jdbc plugin and runtime api to 0.1.3 #153
    
    * fixed
    
    * add Decimal logical
    
    * optimize transform api
    
    * upgrade api to 0.1.3
    
    * fixed
    
    * fixed
    
    Co-authored-by: “sunxiaojian” <“sunxiaojian926@163.com”>
---
 connectors/rocketmq-connect-jdbc/pom.xml           |   2 -
 .../connect/jdbc/connector/JdbcSinkTask.java       |   5 +-
 .../runtime/connectorwrapper/TransformChain.java   |  22 +-
 .../connect/runtime/connectorwrapper/Worker.java   |  21 +-
 .../runtime/connectorwrapper/WorkerSinkTask.java   |  25 +-
 .../connectorwrapper/WorkerSinkTaskContext.java    |  17 +-
 .../runtime/connectorwrapper/WorkerSourceTask.java |  20 +-
 .../runtime/errors/DeadLetterQueueConfig.java      | 152 +++++++++++++
 .../runtime/errors/DeadLetterQueueReporter.java    | 234 +++++++++++++++++++
 .../connect/runtime/errors/ErrorReporter.java      |  76 +++++++
 .../connect/runtime/errors/LogReporter.java        |  66 ++++++
 .../rocketmq/connect/runtime/errors/Operation.java |  28 +++
 .../connect/runtime/errors/ProcessingContext.java  | 243 ++++++++++++++++++++
 .../runtime/errors/ReporterManagerUtil.java        | 105 +++++++++
 .../runtime/errors/RetryWithToleranceOperator.java | 253 +++++++++++++++++++++
 .../connect/runtime/errors/ToleranceType.java      |  41 ++++
 .../runtime/errors/WorkerErrorRecordReporter.java  |  54 +++++
 .../connect/runtime/utils/ConnectUtil.java         |  18 ++
 .../rocketmq/connect/runtime/utils/Utils.java      |  54 +++++
 .../runtime/connectorwrapper/WorkerTest.java       |  10 +-
 .../connect/runtime/rest/RestHandlerTest.java      |  15 +-
 21 files changed, 1439 insertions(+), 22 deletions(-)

diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index 59d7f4b..f525cd7 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -46,8 +46,6 @@
         <junit.version>4.13.1</junit.version>
         <assertj.version>2.6.0</assertj.version>
         <mockito.version>2.6.3</mockito.version>
-
-        <!--rocket connect api-->
         <openmessaging-connector.version>0.1.3</openmessaging-connector.version>
         <openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
 
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
index 35656eb..e967849 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.jdbc.connector;
 
 
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.ErrorRecordReporter;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
@@ -42,6 +43,7 @@ public class JdbcSinkTask extends SinkTask {
 
     private static final Logger log = LoggerFactory.getLogger(JdbcSinkTask.class);
     private SinkTaskContext context;
+    private ErrorRecordReporter errorRecordReporter;
     private KeyValue originalConfig;
     private JdbcSinkConfig config;
     private DatabaseDialect dialect;
@@ -73,6 +75,7 @@ public class JdbcSinkTask extends SinkTask {
                 remainingRetries--;
                 throw new RetriableException(sqlAllMessagesException);
             }
+
         }
         remainingRetries = config.getMaxRetries();
     }
@@ -86,7 +89,7 @@ public class JdbcSinkTask extends SinkTask {
         sqlAllMessagesException.setNextException(sqle);
         return sqlAllMessagesException;
     }
-
+  
     /**
      * Start the component
      *
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index 63cf7d1..8308df7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -23,13 +23,17 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.internal.DefaultKeyValue;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
 import org.slf4j.Logger;
@@ -49,6 +53,8 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
 
     private static final String PREFIX = RuntimeConfigDefine.TRANSFORMS + "-";
 
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+
     public TransformChain(KeyValue config, Plugin plugin) {
         this.config = config;
         this.plugin = plugin;
@@ -56,6 +62,13 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
         init();
     }
 
+    /**
+     * set retryWithToleranceOperator
+     */
+    public void retryWithToleranceOperator(RetryWithToleranceOperator retryWithToleranceOperator) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+    }
+
     private void init() {
         String transformsStr = config.getString(RuntimeConfigDefine.TRANSFORMS);
         if (StringUtils.isBlank(transformsStr)) {
@@ -94,7 +107,14 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
         }
         for (final Transform<R> transform : transformList) {
             final R currentRecord = connectRecord;
-            connectRecord = transform.doTransform(currentRecord);
+            if (this.retryWithToleranceOperator == null) {
+                connectRecord = transform.doTransform(currentRecord);
+            } else {
+                connectRecord = this.retryWithToleranceOperator.execute(
+                        () -> transform.doTransform(currentRecord), ErrorReporter.Stage.TRANSFORMATION, transform.getClass()
+                );
+            }
+
             if (connectRecord == null) {
                 break;
             }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 58c98ce..c1bedaf 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -49,6 +49,8 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
@@ -393,6 +395,7 @@ public class Worker {
         }
 
         //  STEP 2: try to create new tasks
+        int taskId = 0;
         for (String connectorName : newTasks.keySet()) {
             for (ConnectKeyValue keyValue : newTasks.get(connectorName)) {
                 String taskType = keyValue.getString(RuntimeConfigDefine.TASK_TYPE);
@@ -423,10 +426,15 @@ public class Worker {
                         Plugin.compareAndSwapLoaders(loader);
                     }
                     if (task instanceof SourceTask) {
+
                         DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(connectConfig);
                         TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+                        // create retry operator
+                        RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
+                        retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));
+
                         WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
-                            (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
+                            (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
                         Plugin.compareAndSwapLoaders(currentThreadLoader);
 
                         Future future = taskExecutor.submit(workerSourceTask);
@@ -434,14 +442,21 @@ public class Worker {
                         this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
                     } else if (task instanceof SinkTask) {
                         log.info("sink task config keyValue is {}", keyValue.getProperties());
-                        DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue);
+                        DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue, ++taskId);
                         Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
                         if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
                             ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
                         }
                         TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+
+                        // create retry operator
+                        RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
+                        retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, connectConfig));
+
+
                         WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
-                            (SinkTask) task, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain);
+                            (SinkTask) task, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
+                                retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, recordConverter));
                         Plugin.compareAndSwapLoaders(currentThreadLoader);
                         Future future = taskExecutor.submit(workerSinkTask);
                         taskToFutureMap.put(workerSinkTask, future);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 62adc5b..125e269 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -60,6 +60,9 @@ import org.apache.rocketmq.connect.runtime.common.QueueState;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
 import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter;
+import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.WorkerErrorRecordReporter;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
@@ -158,6 +161,10 @@ public class WorkerSinkTask implements WorkerTask {
 
     private final TransformChain<ConnectRecord> transformChain;
 
+    private WorkerErrorRecordReporter errorRecordReporter;
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+
+
     public static final String BROKER_NAME = "brokerName";
     public static final String QUEUE_ID = "queueId";
     public static final String TOPIC = "topic";
@@ -183,7 +190,9 @@ public class WorkerSinkTask implements WorkerTask {
         AtomicReference<WorkerState> workerState,
         ConnectStatsManager connectStatsManager,
         ConnectStatsService connectStatsService,
-        TransformChain<ConnectRecord> transformChain) {
+        TransformChain<ConnectRecord> transformChain,
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        WorkerErrorRecordReporter errorRecordReporter) {
         this.connectorName = connectorName;
         this.sinkTask = sinkTask;
         this.taskConfig = taskConfig;
@@ -197,6 +206,9 @@ public class WorkerSinkTask implements WorkerTask {
         this.connectStatsService = connectStatsService;
         this.stopPullMsgLatch = new CountDownLatch(1);
         this.transformChain = transformChain;
+        this.errorRecordReporter = errorRecordReporter;
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.transformChain.retryWithToleranceOperator(retryWithToleranceOperator);
     }
 
     /**
@@ -538,7 +550,9 @@ public class WorkerSinkTask implements WorkerTask {
     private void receiveMessages(List<MessageExt> messages) {
         List<ConnectRecord> sinkDataEntries = new ArrayList<>(32);
         for (MessageExt message : messages) {
-            ConnectRecord sinkDataEntry = convertToSinkDataEntry(message);
+            this.retryWithToleranceOperator.consumerRecord(message);
+            ConnectRecord sinkDataEntry = this.retryWithToleranceOperator.execute(()->convertToSinkDataEntry(message), ErrorReporter.Stage.CONVERTER, WorkerSinkTask.class);
+            if (sinkDataEntry != null && !this.retryWithToleranceOperator.failed())
             sinkDataEntries.add(sinkDataEntry);
             String msgId = message.getMsgId();
             log.info("Received one message success : msgId {}", msgId);
@@ -670,6 +684,13 @@ public class WorkerSinkTask implements WorkerTask {
         this.sinkTaskContext.resetOffset(offsets);
     }
 
+    /**
+     * error record reporter
+     * @return
+     */
+    public WorkerErrorRecordReporter errorRecordReporter() {
+        return errorRecordReporter;
+    }
 }
 
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
index b17cde3..e698d1d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
@@ -19,14 +19,17 @@
 package org.apache.rocketmq.connect.runtime.connectorwrapper;
 
 import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.component.task.sink.ErrorRecordReporter;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -65,6 +68,11 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         this.consumer = consumer;
     }
 
+    @Override
+    public ErrorRecordReporter errorRecordReporter() {
+        return workerSinkTask.errorRecordReporter();
+    }
+
     @Override
     public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
         if (null == recordPartition || null == recordPartition.getPartition() || null == recordOffset || null == recordOffset.getOffset()) {
@@ -172,15 +180,18 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         }
     }
 
-    @Override public Set<RecordPartition> assignment() {
+    @Override
+    public Set<RecordPartition> assignment() {
         return this.workerSinkTask.getRecordPartitions();
     }
 
-    @Override public String getConnectorName() {
+    @Override
+    public String getConnectorName() {
         return taskConfig.getString("connectorName");
     }
 
-    @Override public String getTaskName() {
+    @Override
+    public String getTaskName() {
         return taskConfig.getString("taskId");
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 0de1f6f..2a2cb07 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -51,11 +51,13 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,6 +121,7 @@ public class WorkerSourceTask implements WorkerTask {
 
     private TransformChain<ConnectRecord> transformChain;
 
+    private RetryWithToleranceOperator retryWithToleranceOperator;
     /**
      * The property of message in WHITE_KEY_SET don't need add a connect prefix
      */
@@ -138,7 +141,8 @@ public class WorkerSourceTask implements WorkerTask {
         AtomicReference<WorkerState> workerState,
         ConnectStatsManager connectStatsManager,
         ConnectStatsService connectStatsService,
-        TransformChain<ConnectRecord> transformChain) {
+        TransformChain<ConnectRecord> transformChain,
+        RetryWithToleranceOperator retryWithToleranceOperator) {
         this.connectorName = connectorName;
         this.sourceTask = sourceTask;
         this.taskConfig = taskConfig;
@@ -151,6 +155,8 @@ public class WorkerSourceTask implements WorkerTask {
         this.connectStatsManager = connectStatsManager;
         this.connectStatsService = connectStatsService;
         this.transformChain = transformChain;
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.transformChain.retryWithToleranceOperator(this.retryWithToleranceOperator);
     }
 
     /**
@@ -243,8 +249,11 @@ public class WorkerSourceTask implements WorkerTask {
             }
             List<ConnectRecord> connectRecordList1 = new ArrayList<>(32);
             for (ConnectRecord connectRecord : connectRecordList) {
+
+                retryWithToleranceOperator.sourceRecord(connectRecord);
+
                 ConnectRecord connectRecord1 = this.transformChain.doTransforms(connectRecord);
-                if (null != connectRecord1) {
+                if (null != connectRecord1 && !retryWithToleranceOperator.failed()) {
                     connectRecordList1.add(connectRecord1);
                 }
             }
@@ -258,11 +267,8 @@ public class WorkerSourceTask implements WorkerTask {
     @Override
     public void stop() {
         state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
-        try {
-            transformChain.close();
-        } catch (Exception exception) {
-            log.error("Transform close failed, {}", exception);
-        }
+        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+        Utils.closeQuietly(transformChain, "transform chain");
         log.warn("Stop a task success.");
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueConfig.java
new file mode 100644
index 0000000..0a27494
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueConfig.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+
+/**
+ * dead letter queue config
+ */
+public class DeadLetterQueueConfig {
+    /**
+     * dead letter queue config
+     */
+    public static final String DLQ_PREFIX = "errors.deadletterqueue.";
+    public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name";
+
+
+    public static final String DLQ_TOPIC_READ_QUEUE_NUMS = DLQ_PREFIX + "read.queue.nums";
+    public static final short DLQ_TOPIC_READ_QUEUE_NUMS_DEFAULT = 16;
+
+    public static final String DLQ_TOPIC_WRITE_QUEUE_NUMS = DLQ_PREFIX + "write.queue.nums";
+    public static final short DLQ_TOPIC_WRITE_QUEUE_NUMS_DEFAULT = 16;
+
+    public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable";
+    public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
+
+    public static final String ERRORS_LOG_INCLUDE_MESSAGES_CONFIG = "errors.log.include.messages";
+    public static final boolean ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT = false;
+
+    public static final String ERRORS_LOG_ENABLE_CONFIG = "errors.log.enable";
+    public static final boolean ERRORS_LOG_ENABLE_DEFAULT = false;
+
+
+    public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout";
+    public static final int ERRORS_RETRY_TIMEOUT_DEFAULT = 0;
+
+    public static final String ERRORS_RETRY_MAX_DELAY_CONFIG = "errors.retry.delay.max.ms";
+    public static final String ERRORS_RETRY_MAX_DELAY_DISPLAY = "Maximum Delay Between Retries for Errors";
+    public static final int ERRORS_RETRY_MAX_DELAY_DEFAULT = 60000;
+
+    public static final String ERRORS_TOLERANCE_CONFIG = "errors.tolerance";
+    public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = ToleranceType.NONE;
+
+    private ConnectKeyValue config;
+
+    /**
+     * config
+     *
+     * @param config
+     */
+    public DeadLetterQueueConfig(ConnectKeyValue config) {
+        this.config = config;
+    }
+
+
+    /**
+     * get dlq topic name
+     *
+     * @return
+     */
+    public String dlqTopicName() {
+        return config.getString(DLQ_TOPIC_NAME_CONFIG, "");
+    }
+
+
+    /**
+     * get dlq context headers enabled
+     *
+     * @return
+     */
+    public Boolean isDlqContextHeadersEnabled() {
+        return config.getProperties().containsKey(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG) ?
+                Boolean.getBoolean(config.getProperties().get(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG)) : DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT;
+    }
+
+
+    /**
+     * get dlq topic read queue nums
+     *
+     * @return
+     */
+    public Integer dlqTopicReadQueueNums() {
+        return config.getInt(DLQ_TOPIC_READ_QUEUE_NUMS, DLQ_TOPIC_READ_QUEUE_NUMS_DEFAULT);
+
+    }
+
+    /**
+     * get dlq topic write queue nums
+     *
+     * @return
+     */
+    public Integer dlqTopicWriteQueueNums() {
+        return config.getInt(DLQ_TOPIC_WRITE_QUEUE_NUMS, DLQ_TOPIC_WRITE_QUEUE_NUMS_DEFAULT);
+    }
+
+    /**
+     * include error log
+     *
+     * @return
+     */
+    public boolean includeRecordDetailsInErrorLog() {
+        return config.getProperties().containsKey(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG) ?
+                Boolean.getBoolean(config.getProperties().get(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG)) : ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT;
+    }
+
+
+    public boolean enableErrantRecordReporter() {
+        String dqlTopic = dlqTopicName();
+        boolean enableErrorLog = enableErrorLog();
+        boolean enableDqlTopic = !dqlTopic.isEmpty();
+        return enableErrorLog || enableDqlTopic;
+    }
+
+    private boolean enableErrorLog() {
+        return config.getProperties().containsKey(ERRORS_LOG_ENABLE_CONFIG) ?
+                Boolean.getBoolean(config.getProperties().get(ERRORS_LOG_ENABLE_CONFIG)) : ERRORS_LOG_ENABLE_DEFAULT;
+
+    }
+
+    public long errorRetryTimeout() {
+        return config.getLong(ERRORS_RETRY_TIMEOUT_CONFIG);
+    }
+
+    public long errorMaxDelayInMillis() {
+        return config.getLong(ERRORS_RETRY_MAX_DELAY_CONFIG);
+    }
+
+    public ToleranceType errorToleranceType() {
+        String tolerance = config.getString(ERRORS_TOLERANCE_CONFIG);
+        for (ToleranceType type : ToleranceType.values()) {
+            if (type.name().equalsIgnoreCase(tolerance)) {
+                return type;
+            }
+        }
+        return ERRORS_TOLERANCE_DEFAULT;
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java
new file mode 100644
index 0000000..cf8967b
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Objects;
+
+
+/**
+ * Write the original consumed record into a dead letter queue
+ */
+public class DeadLetterQueueReporter implements ErrorReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueReporter.class);
+
+
+    public static final String HEADER_PREFIX = "__connect.errors.";
+    public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + "topic";
+    public static final String ERROR_HEADER_ORIG_PARTITION = HEADER_PREFIX + "partition";
+    public static final String ERROR_HEADER_ORIG_OFFSET = HEADER_PREFIX + "offset";
+    public static final String ERROR_HEADER_CONNECTOR_NAME = HEADER_PREFIX + "connector.name";
+    public static final String ERROR_HEADER_CLUSTER_ID = HEADER_PREFIX + "cluster.id";
+    public static final String ERROR_HEADER_STAGE = HEADER_PREFIX + "stage";
+    public static final String ERROR_HEADER_EXECUTING_CLASS = HEADER_PREFIX + "class.name";
+    public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name";
+    public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message";
+    public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace";
+
+
+    /**
+     * The configs of current source task.
+     */
+    private ConnectKeyValue config;
+
+    /**
+     * A RocketMQ producer to send message to dest MQ.
+     */
+    private DefaultMQProducer producer;
+
+    /**
+     * worker id
+     */
+    private String workerId;
+
+    /**
+     * config
+     */
+    private final DeadLetterQueueConfig deadLetterQueueConfig;
+
+    private String connectorName;
+
+
+    public void workerId(String workerId) {
+        this.workerId = workerId;
+    }
+
+
+    /**
+     * build reporter
+     *
+     * @param connectorName
+     * @param sinkConfig
+     * @param workerConfig
+     * @return
+     */
+    public static DeadLetterQueueReporter build(String connectorName,
+                                                ConnectKeyValue sinkConfig,
+                                                ConnectConfig workerConfig) {
+
+        DeadLetterQueueConfig deadLetterQueueConfig = new DeadLetterQueueConfig(sinkConfig);
+        String dlqTopic = deadLetterQueueConfig.dlqTopicName();
+        if (dlqTopic.isEmpty()) {
+            return null;
+        }
+        if (!ConnectUtil.isTopicExist(workerConfig, dlqTopic)) {
+            TopicConfig topicConfig = new TopicConfig(dlqTopic);
+            topicConfig.setReadQueueNums(deadLetterQueueConfig.dlqTopicReadQueueNums());
+            topicConfig.setWriteQueueNums(deadLetterQueueConfig.dlqTopicWriteQueueNums());
+            ConnectUtil.createTopic(workerConfig, topicConfig);
+        }
+        DefaultMQProducer dlqProducer = ConnectUtil.initDefaultMQProducer(workerConfig);
+        return new DeadLetterQueueReporter(dlqProducer, sinkConfig, connectorName);
+    }
+
+    /**
+     * Initialize the dead letter queue reporter with a producer
+     *
+     * @param producer
+     * @param connConfig
+     * @param connectorName
+     */
+    DeadLetterQueueReporter(DefaultMQProducer producer,
+                            ConnectKeyValue connConfig,
+                            String connectorName) {
+        Objects.requireNonNull(producer);
+        Objects.requireNonNull(connConfig);
+        Objects.requireNonNull(connectorName);
+        this.producer = producer;
+        this.config = connConfig;
+        this.connectorName = connectorName;
+        this.deadLetterQueueConfig = new DeadLetterQueueConfig(connConfig);
+    }
+
+    /**
+     * Write the raw records into a topic
+     */
+    @Override
+    public void report(ProcessingContext context) {
+        if (this.deadLetterQueueConfig.dlqTopicName().trim().isEmpty()) {
+            return;
+        }
+        MessageExt originalMessage = context.consumerRecord();
+        if (originalMessage == null) {
+            return;
+        }
+
+        Message producerRecord = new Message();
+        producerRecord.setTopic(deadLetterQueueConfig.dlqTopicName());
+        producerRecord.setBody(originalMessage.getBody());
+
+        if (deadLetterQueueConfig.isDlqContextHeadersEnabled()) {
+            populateContextHeaders(originalMessage, context);
+        }
+
+        try {
+            producer.send(originalMessage, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult result) {
+                    log.info("Successful send error message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
+                }
+
+                @Override
+                public void onException(Throwable throwable) {
+                    log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(originalMessage), throwable);
+                }
+            });
+        } catch (MQClientException e) {
+            log.error("Send message MQClientException. message: {}, error info: {}.", producerRecord, e);
+        } catch (RemotingException e) {
+            log.error("Send message RemotingException. message: {}, error info: {}.", producerRecord, e);
+        } catch (InterruptedException e) {
+            log.error("Send message InterruptedException. message: {}, error info: {}.", producerRecord, e);
+            throw new ConnectException(e);
+        }
+    }
+
+
+    /**
+     * pop context property
+     *
+     * @param producerRecord
+     * @param context
+     */
+    void populateContextHeaders(Message producerRecord, ProcessingContext context) {
+        Map<String, String> headers = producerRecord.getProperties();
+        if (context.consumerRecord() != null) {
+            producerRecord.putUserProperty(ERROR_HEADER_ORIG_TOPIC, context.consumerRecord().getTopic());
+            producerRecord.putUserProperty(ERROR_HEADER_ORIG_PARTITION, String.valueOf(context.consumerRecord().getQueueId()));
+            producerRecord.putUserProperty(ERROR_HEADER_ORIG_OFFSET, String.valueOf(context.consumerRecord().getQueueOffset()));
+        }
+        if (workerId != null) {
+            producerRecord.putUserProperty(ERROR_HEADER_CLUSTER_ID, workerId);
+        }
+        producerRecord.putUserProperty(ERROR_HEADER_STAGE, context.stage().name());
+        producerRecord.putUserProperty(ERROR_HEADER_EXECUTING_CLASS, context.executingClass().getName());
+        producerRecord.putUserProperty(ERROR_HEADER_CONNECTOR_NAME, connectorName);
+        if (context.error() != null) {
+            Throwable error = context.error();
+            headers.put(ERROR_HEADER_EXCEPTION, error.getClass().getName());
+            headers.put(ERROR_HEADER_EXCEPTION_MESSAGE, error.getMessage());
+            byte[] trace;
+            if ((trace = stacktrace(context.error())) != null) {
+                headers.put(ERROR_HEADER_EXCEPTION_STACK_TRACE, new String(trace));
+            }
+        }
+    }
+
+    private byte[] stacktrace(Throwable error) {
+        if (error == null) {
+            return null;
+        }
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try {
+            PrintStream stream = new PrintStream(bos, true, "UTF-8");
+            error.printStackTrace(stream);
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            log.error("Could not serialize stacktrace.", e);
+        }
+        return null;
+    }
+
+    @Override
+    public void close() {
+        if (producer != null) {
+            producer.shutdown();
+        }
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
new file mode 100644
index 0000000..2125fc6
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+
+/**
+ * error reporter interface
+ */
+public interface ErrorReporter extends AutoCloseable {
+
+    /**
+     * report message
+     *
+     * @param context
+     */
+    void report(ProcessingContext context);
+
+    /**
+     * close reporters
+     */
+    @Override
+    default void close() {
+    }
+
+
+    /**
+     * A logical stage in a Connect pipeline.
+     */
+    enum Stage {
+
+        /**
+         * source poll
+         */
+        TASK_POLL,
+
+        /**
+         * sink put
+         */
+        TASK_PUT,
+
+        /**
+         * transform record
+         */
+        TRANSFORMATION,
+
+        /**
+         * converter record
+         */
+        CONVERTER,
+
+        /**
+         * When producing to rocketmq topic
+         */
+        PRODUCE,
+
+        /**
+         * When consuming from a rocketmq topic
+         */
+        CONSUME
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/LogReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/LogReporter.java
new file mode 100644
index 0000000..2f4113b
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/LogReporter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+/**
+ * Writes errors and their context to application logs.
+ */
+public class LogReporter implements ErrorReporter {
+
+    private static final Logger log = LoggerFactory.getLogger(LogReporter.class);
+
+    private final String connectName;
+    private final DeadLetterQueueConfig deadLetterQueueConfig;
+
+    public LogReporter(String connectorName,
+                       ConnectKeyValue sinkConfig) {
+        Objects.requireNonNull(connectorName);
+        Objects.requireNonNull(sinkConfig);
+
+        this.connectName = connectorName;
+        this.deadLetterQueueConfig = new DeadLetterQueueConfig(sinkConfig);
+    }
+
+    /**
+     * Log error context.
+     *
+     * @param context the processing context.
+     */
+    @Override
+    public void report(ProcessingContext context) {
+        log.error(message(context), context.error());
+    }
+
+    /**
+     * format error message
+     *
+     * @param context
+     * @return
+     */
+    String message(ProcessingContext context) {
+        return String.format("Error encountered in task %s. %s", String.valueOf(connectName),
+                context.toString(deadLetterQueueConfig.includeRecordDetailsInErrorLog()));
+    }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/Operation.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/Operation.java
new file mode 100644
index 0000000..ac28b3a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/Operation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.errors;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A recoverable operation evaluated in the connector pipeline.
+ *
+ * @param <V> return type of the result of the operation.
+ */
+public interface Operation<V> extends Callable<V> {
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
new file mode 100644
index 0000000..644f024
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * contains all info current time
+ */
+class ProcessingContext implements AutoCloseable {
+
+    /**
+     * reporters
+     */
+    private Collection<ErrorReporter> reporters = Collections.emptyList();
+
+    /**
+     * send message
+     */
+    private MessageExt consumedMessage;
+
+    /**
+     * original source record
+     */
+    private ConnectRecord sourceRecord;
+
+    /**
+     * stage
+     */
+    private ErrorReporter.Stage stage;
+    private Class<?> klass;
+
+    /**
+     * attempt
+     */
+    private int attempt;
+    /**
+     * error message
+     */
+    private Throwable error;
+
+    /**
+     * reset info
+     */
+    private void reset() {
+        attempt = 0;
+        stage = null;
+        klass = null;
+        error = null;
+    }
+
+    /**
+     * Set the record consumed from Kafka in a sink connector.
+     *
+     * @param consumedMessage the record
+     */
+    public void consumerRecord(MessageExt consumedMessage) {
+        this.consumedMessage = consumedMessage;
+        reset();
+    }
+
+    /**
+     * @return the record consumed from Kafka. could be null
+     */
+    public MessageExt consumerRecord() {
+        return consumedMessage;
+    }
+
+    /**
+     * @return the source record being processed.
+     */
+    public ConnectRecord sourceRecord() {
+        return sourceRecord;
+    }
+
+    /**
+     * Set the source record being processed in the connect pipeline.
+     *
+     * @param record the source record
+     */
+    public void sourceRecord(ConnectRecord record) {
+        this.sourceRecord = record;
+        reset();
+    }
+
+    /**
+     * Set the stage in the connector pipeline which is currently executing.
+     *
+     * @param stage the stage
+     */
+    public void stage(ErrorReporter.Stage stage) {
+        this.stage = stage;
+    }
+
+    /**
+     * @return the stage in the connector pipeline which is currently executing.
+     */
+    public ErrorReporter.Stage stage() {
+        return stage;
+    }
+
+    /**
+     * @return the class which is going to execute the current operation.
+     */
+    public Class<?> executingClass() {
+        return klass;
+    }
+
+    /**
+     * @param klass set the class which is currently executing.
+     */
+    public void executingClass(Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * A helper method to set both the stage and the class.
+     *
+     * @param stage the stage
+     * @param klass the class which will execute the operation in this stage.
+     */
+    public void currentContext(ErrorReporter.Stage stage, Class<?> klass) {
+        stage(stage);
+        executingClass(klass);
+    }
+
+
+    /**
+     * report errors
+     */
+    public void report() {
+        if (reporters.size() == 1) {
+            reporters.iterator().next().report(this);
+        }
+        reporters.stream().forEach(r -> r.report(this));
+    }
+
+
+    /**
+     * @param attempt the number of attempts made to execute the current operation.
+     */
+    public void attempt(int attempt) {
+        this.attempt = attempt;
+    }
+
+    public int attempt() {
+        return attempt;
+    }
+
+
+    public Throwable error() {
+        return error;
+    }
+
+    /**
+     * set error
+     *
+     * @param error
+     */
+    public void error(Throwable error) {
+        this.error = error;
+    }
+
+
+    /**
+     * @return
+     */
+    public boolean failed() {
+        return error() != null;
+    }
+
+
+    /**
+     * set reporters
+     *
+     * @param reporters
+     */
+    public void reporters(Collection<ErrorReporter> reporters) {
+        Objects.requireNonNull(reporters);
+        this.reporters = reporters;
+    }
+
+    @Override
+    public void close() {
+        ConnectException e = null;
+        for (ErrorReporter reporter : reporters) {
+            try {
+                reporter.close();
+            } catch (Throwable t) {
+                e = e != null ? e : new ConnectException("Failed to close all reporters");
+                e.addSuppressed(t);
+            }
+        }
+        if (e != null) {
+            throw e;
+        }
+    }
+
+    public String toString(boolean includeMessage) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("Executing stage '");
+        builder.append(stage().name());
+        builder.append("' with class '");
+        builder.append(executingClass() == null ? "null" : executingClass().getName());
+        builder.append('\'');
+        if (includeMessage && sourceRecord() != null) {
+            builder.append(", where source record is = ");
+            builder.append(sourceRecord());
+        } else if (includeMessage && consumerRecord() != null) {
+            MessageExt msg = consumerRecord();
+            builder.append(", where consumed record is ");
+            builder.append("{topic='").append(consumedMessage.getTopic()).append('\'');
+            builder.append(", partition=").append(msg.getQueueId());
+            builder.append(", offset=").append(msg.getQueueOffset());
+            builder.append(", bornTimestamp=").append(msg.getBornTimestamp());
+            builder.append(", storeTimestamp=").append(msg.getStoreTimestamp());
+            builder.append("}");
+        }
+        builder.append('.');
+        return builder.toString();
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtil.java
new file mode 100644
index 0000000..6c4e30a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtil.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.data.Converter;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * reporter manage util
+ */
+public class ReporterManagerUtil {
+
+    /**
+     * create retry operator
+     *
+     * @param connConfig
+     * @return
+     */
+    public static RetryWithToleranceOperator createRetryWithToleranceOperator(ConnectKeyValue connConfig) {
+        DeadLetterQueueConfig deadLetterQueueConfig = new DeadLetterQueueConfig(connConfig);
+        return new RetryWithToleranceOperator(
+                deadLetterQueueConfig.errorRetryTimeout(),
+                deadLetterQueueConfig.errorMaxDelayInMillis(),
+                deadLetterQueueConfig.errorToleranceType()
+        );
+    }
+
+    /**
+     * create worker error record reporter
+     *
+     * @param connConfig
+     * @param retryWithToleranceOperator
+     * @param converter
+     * @return
+     */
+    public static WorkerErrorRecordReporter createWorkerErrorRecordReporter(
+            ConnectKeyValue connConfig,
+            RetryWithToleranceOperator retryWithToleranceOperator,
+            Converter converter) {
+        DeadLetterQueueConfig deadLetterQueueConfig = new DeadLetterQueueConfig(connConfig);
+        if (deadLetterQueueConfig.enableErrantRecordReporter()) {
+            return new WorkerErrorRecordReporter(retryWithToleranceOperator, converter);
+        }
+        return null;
+    }
+
+    /**
+     * build sink task reporter
+     *
+     * @param connectName
+     * @param connConfig
+     * @param workerConfig
+     * @return
+     */
+    public static List<ErrorReporter> sinkTaskReporters(String connectName,
+                                                        ConnectKeyValue connConfig,
+                                                        ConnectConfig workerConfig) {
+        // ensure reporter order
+        ArrayList<ErrorReporter> reporters = new ArrayList<>();
+        LogReporter logReporter = new LogReporter(connectName, connConfig);
+        reporters.add(logReporter);
+
+        // dead letter queue reporter
+        DeadLetterQueueReporter reporter = DeadLetterQueueReporter.build(connectName, connConfig, workerConfig);
+        if (reporter != null) {
+            reporters.add(reporter);
+        }
+        return reporters;
+    }
+
+    /**
+     * build source task reporter
+     *
+     * @param connectName
+     * @param connConfig
+     * @return
+     */
+    public static List<ErrorReporter> sourceTaskReporters(String connectName,
+                                                          ConnectKeyValue connConfig) {
+        List<ErrorReporter> reporters = new ArrayList<>();
+        LogReporter logReporter = new LogReporter(connectName, connConfig);
+        reporters.add(logReporter);
+
+        return reporters;
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
new file mode 100644
index 0000000..b0fa283
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * retry operator
+ */
+public class RetryWithToleranceOperator implements AutoCloseable {
+
+    private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
+
+    public static final long RETRIES_DELAY_MIN_MS = 300;
+
+    private static final Map<ErrorReporter.Stage, Class<? extends Exception>> TOLERABLE_EXCEPTIONS = new HashMap<>();
+
+    static {
+        TOLERABLE_EXCEPTIONS.put(ErrorReporter.Stage.TRANSFORMATION, Exception.class);
+        TOLERABLE_EXCEPTIONS.put(ErrorReporter.Stage.CONVERTER, Exception.class);
+    }
+
+    private final long retryTimeout;
+    private final long maxDelayInMillis;
+    private final ToleranceType toleranceType;
+
+    private long totalFailures = 0;
+    protected ProcessingContext context = new ProcessingContext();
+
+    public RetryWithToleranceOperator(long errorRetryTimeout,
+                                      long maxDelayInMillis,
+                                      ToleranceType toleranceType) {
+        this.retryTimeout = errorRetryTimeout;
+        this.maxDelayInMillis = maxDelayInMillis;
+        this.toleranceType = toleranceType;
+    }
+
+    public void executeFailed(ErrorReporter.Stage stage,
+                              Class<?> executingClass,
+                              MessageExt consumerRecord,
+                              Throwable error) {
+
+        markAsFailed();
+        context.consumerRecord(consumerRecord);
+        context.currentContext(stage, executingClass);
+        context.error(error);
+        context.report();
+        if (!withinToleranceLimits()) {
+            throw new ConnectException("Tolerance exceeded in error handler", error);
+        }
+    }
+
+    /**
+     * Execute the recoverable operation. If the operation is already in a failed state, then simply return
+     * with the existing failure.
+     */
+    public <V> V execute(Operation<V> operation, ErrorReporter.Stage stage, Class<?> executingClass) {
+        context.currentContext(stage, executingClass);
+        if (context.failed()) {
+            log.debug("ProcessingContext is already in failed state. Ignoring requested operation.");
+            return null;
+        }
+
+        try {
+            Class<? extends Exception> ex = TOLERABLE_EXCEPTIONS.getOrDefault(context.stage(), RetriableException.class);
+            return execAndHandleError(operation, ex);
+        } finally {
+            if (context.failed()) {
+                context.report();
+            }
+        }
+    }
+
+    /**
+     * Attempt to execute an operation.
+     */
+    protected <V> V execAndRetry(Operation<V> operation) throws Exception {
+        int attempt = 0;
+        long startTime = System.currentTimeMillis();
+        long deadline = startTime + retryTimeout;
+        do {
+            try {
+                attempt++;
+                return operation.call();
+            } catch (RetriableException e) {
+                log.trace("Caught a retriable exception while executing {} operation with {}", context.stage(), context.executingClass());
+                if (checkRetry(startTime)) {
+                    backoff(attempt, deadline);
+                    if (Thread.currentThread().isInterrupted()) {
+                        log.trace("Thread was interrupted. Marking operation as failed.");
+                        context.error(e);
+                        return null;
+                    }
+                } else {
+                    log.trace("Can't retry. start={}, attempt={}, deadline={}", startTime, attempt, deadline);
+                    context.error(e);
+                    return null;
+                }
+            } finally {
+                context.attempt(attempt);
+            }
+        } while (true);
+    }
+
+    /**
+     * Execute a given operation multiple times (if needed), and tolerate certain exceptions.
+     */
+    protected <V> V execAndHandleError(Operation<V> operation, Class<? extends Exception> tolerated) {
+        try {
+            V result = execAndRetry(operation);
+            if (context.failed()) {
+                markAsFailed();
+            }
+            return result;
+        } catch (Exception e) {
+            markAsFailed();
+            context.error(e);
+
+            if (!tolerated.isAssignableFrom(e.getClass())) {
+                throw new ConnectException("Unhandled exception in error handler", e);
+            }
+
+            if (!withinToleranceLimits()) {
+                throw new ConnectException("Tolerance exceeded in error handler", e);
+            }
+            return null;
+        }
+    }
+
+    private void markAsFailed() {
+        totalFailures++;
+    }
+
+    public boolean withinToleranceLimits() {
+        switch (toleranceType) {
+            case NONE:
+                if (totalFailures > 0) {
+                    return false;
+                }
+            case ALL:
+                return true;
+            default:
+                throw new ConnectException("Unknown tolerance type: " + toleranceType);
+        }
+    }
+
+    boolean checkRetry(long startTime) {
+        return (System.currentTimeMillis() - startTime) < retryTimeout;
+    }
+
+    void backoff(int attempt, long deadline) {
+        int numRetry = attempt - 1;
+        long delay = RETRIES_DELAY_MIN_MS << numRetry;
+        if (delay > maxDelayInMillis) {
+            delay = ThreadLocalRandom.current().nextLong(maxDelayInMillis);
+        }
+        if (delay + System.currentTimeMillis() > deadline) {
+            delay = deadline - System.currentTimeMillis();
+        }
+        log.debug("Sleeping for {} millis", delay);
+        Utils.sleep(delay);
+    }
+
+    /**
+     * Set the error reporters for this connector.
+     *
+     * @param reporters the error reporters (should not be null).
+     */
+    public void reporters(List<ErrorReporter> reporters) {
+        this.context.reporters(reporters);
+    }
+
+    /**
+     * Set the source record being processed in the connect pipeline.
+     *
+     * @param preTransformRecord the source record
+     */
+    public void sourceRecord(ConnectRecord preTransformRecord) {
+        this.context.sourceRecord(preTransformRecord);
+    }
+
+
+    /**
+     * Set the record consumed rocketmq in a sink
+     *
+     * @param messageExt
+     */
+    public void consumerRecord(MessageExt messageExt) {
+        this.context.consumerRecord(messageExt);
+    }
+
+    /**
+     * failed
+     *
+     * @return
+     */
+    public boolean failed() {
+        return this.context.failed();
+    }
+
+    /**
+     * error
+     *
+     * @return
+     */
+    public Throwable error() {
+        return this.context.error();
+    }
+
+
+    @Override
+    public void close() {
+        this.context.close();
+    }
+
+
+    @Override
+    public String toString() {
+        return "RetryWithToleranceOperator{" +
+                "retryTimeout=" + retryTimeout +
+                ", errorMaxDelayInMillis=" + maxDelayInMillis +
+                ", errorToleranceType=" + toleranceType +
+                ", totalFailures=" + totalFailures +
+                ", context=" + context +
+                '}';
+    }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ToleranceType.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ToleranceType.java
new file mode 100644
index 0000000..53c3d65
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ToleranceType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import java.util.Locale;
+
+/**
+ * The different levels of error tolerance.
+ */
+public enum ToleranceType {
+
+    /**
+     * Tolerate no errors.
+     */
+    NONE,
+
+    /**
+     * Tolerate all errors.
+     */
+    ALL;
+
+    public String value() {
+        return name().toLowerCase(Locale.ROOT);
+    }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
new file mode 100644
index 0000000..59533aa
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.component.task.sink.ErrorRecordReporter;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Converter;
+import org.apache.rocketmq.common.message.MessageExt;
+
+/**
+ * worker error record reporter
+ */
+public class WorkerErrorRecordReporter implements ErrorRecordReporter {
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter converter;
+
+    public WorkerErrorRecordReporter(RetryWithToleranceOperator retryWithToleranceOperator,
+                                     Converter converter) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.converter = converter;
+    }
+
+    /**
+     * report record
+     *
+     * @param record
+     * @param error
+     * @return
+     */
+    @Override
+    public void report(ConnectRecord record, Throwable error) {
+        byte[] value = converter.objectToByte(record.getData());
+        MessageExt consumerRecord = new MessageExt();
+        consumerRecord.setBody(value);
+        retryWithToleranceOperator.executeFailed(ErrorReporter.Stage.TASK_PUT, SinkTask.class, consumerRecord, error);
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index b7f9c5f..5f7ad28 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -288,6 +288,24 @@ public class ConnectUtil {
         return recordPartition;
     }
 
+    public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, String connectorName, ConnectKeyValue keyValue, Integer taskId) {
+        RPCHook rpcHook = null;
+        if (connectConfig.getAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+        }
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
+        consumer.setInstanceName(createInstance(connectorName.concat("-").concat(taskId.toString())));
+        String taskGroupId = keyValue.getString("task-group-id");
+        if (StringUtils.isNotBlank(taskGroupId)) {
+            consumer.setConsumerGroup(taskGroupId);
+        } else {
+            consumer.setConsumerGroup(SYS_TASK_CG_PREFIX + connectorName);
+        }
+        if (StringUtils.isNotBlank(connectConfig.getNamesrvAddr())) {
+            consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        }
+        return consumer;
+    }
 
     public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, String connectorName, ConnectKeyValue keyValue) {
         RPCHook rpcHook = null;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
new file mode 100644
index 0000000..60f1b5b
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  common utils
+ */
+public class Utils {
+
+    private static final Logger log = LoggerFactory.getLogger(Utils.class);
+    /**
+     * Sleep for a bit
+     * @param ms The duration of the sleep
+     */
+    public static void sleep(long ms) {
+        try {
+            Thread.sleep(ms);
+        } catch (InterruptedException e) {
+            // this is okay, we just wake up early
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
+     */
+    public static void closeQuietly(AutoCloseable closeable, String name) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Throwable t) {
+                log.warn("Failed to close {} with type {}", name, closeable.getClass().getName(), t);
+            }
+        }
+    }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 3a04098..0c98f12 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -38,6 +38,8 @@ import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnect
 import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
@@ -110,6 +112,11 @@ public class WorkerTest {
             ConnectKeyValue connectKeyValue = new ConnectKeyValue();
             connectKeyValue.getProperties().put("key1", "TEST-TASK-" + i + "1");
             connectKeyValue.getProperties().put("key2", "TEST-TASK-" + i + "2");
+
+            // create retry operator
+            RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
+            retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("TEST-CONN-" + i, connectKeyValue));
+
             runnables.add(new WorkerSourceTask("TEST-CONN-" + i,
                 new TestSourceTask(),
                 connectKeyValue,
@@ -118,7 +125,8 @@ public class WorkerTest {
                 producer,
                 new AtomicReference(WorkerState.STARTED),
                 connectStatsManager, connectStatsService,
-                transformChain));
+                transformChain,
+                retryWithToleranceOperator));
         }
         worker.setWorkingTasks(runnables);
         assertThat(worker.getWorkingTasks().size()).isEqualTo(3);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index b452e6c..509f609 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -49,6 +49,8 @@ import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerState;
+import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
@@ -206,8 +208,17 @@ public class RestHandlerTest {
             }
         };
         TransformChain<ConnectRecord> transformChain = new TransformChain<ConnectRecord>(new DefaultKeyValue(), new Plugin(new ArrayList<>()));
-        WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
-        WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
+        // create retry operator
+        RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
+        retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName1", connectKeyValue));
+
+        WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
+
+        // create retry operator
+        RetryWithToleranceOperator retryWithToleranceOperator02 = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
+        retryWithToleranceOperator02.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName2", connectKeyValue));
+
+        WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
         workerTasks = new HashSet<Runnable>() {
             {
                 add(workerSourceTask1);