You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/28 05:48:04 UTC

[rocketmq-connect] branch master updated: [ISSUE #183] Optimize worker task (#201)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 26ef4e5  [ISSUE #183] Optimize worker task (#201)
26ef4e5 is described below

commit 26ef4e52c3178c2e06e6ac0f4ea16afb5aa5db7b
Author: xiaoyi <su...@163.com>
AuthorDate: Thu Jul 28 13:47:59 2022 +0800

    [ISSUE #183] Optimize worker task (#201)
    
    * Optimize WorkerSourceTask and WorkerSinkTask #183
    
    * Optimize WorkerSourceTask and WorkerSinkTask #183
    
    * Optimize worker #183
    
    * fixed
    
    * fixed
    
    * format code
    
    * fixed
    
    * fixed WorkerTask #183
    
    * Fix debezium demecial type conversion problem #190
    
    * Upgrade rocketmq-replicator API to v0.1.3 #189
    
    * Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191
    
    * Debezium mysql source connector delete event causes null pointer #196
    
    * remove local config
    
    * Debezium mysql source connector delete event causes null pointer #196
    
    * Ensure the orderly submission of offset #183
    
    * fixed
    
    * Optimize WorkerDirectTask #183
    
    * fixed  #183
    
    * test offset commit #183
    
    * fixed #183
    
    * fixed task id #183
    
    * fixed
    
    * upgrade fastjson
    
    * reformat code #183
    
    * Whether the same task is assigned to different worker nodes with different connectorTaskId [fixed]
    
    * fixed
    
    * Rocketmq replicator running null pointer #205
---
 connectors/rocketmq-connect-jdbc/pom.xml           |   7 +-
 pom.xml                                            |   4 +-
 .../connect/runtime/config/ConnectConfig.java      | 121 +++-
 .../runtime/config/SinkConnectorConfig.java        |   2 -
 .../connectorwrapper/RecordOffsetManagement.java   | 285 +++++++++
 .../SourceTaskOffsetCommitter.java                 | 113 ++++
 .../connect/runtime/connectorwrapper/Worker.java   | 540 +++++++++--------
 .../runtime/connectorwrapper/WorkerDirectTask.java | 410 +++++++------
 .../runtime/connectorwrapper/WorkerSinkTask.java   | 500 +++++++--------
 .../connectorwrapper/WorkerSinkTaskContext.java    |   2 +-
 .../runtime/connectorwrapper/WorkerSourceTask.java | 673 +++++++++++++--------
 .../connectorwrapper/WorkerSourceTaskContext.java  |  60 ++
 .../runtime/connectorwrapper/WorkerTask.java       | 184 +++++-
 .../connect/runtime/errors/ErrorReporter.java      |   6 +
 .../runtime/errors/RetryWithToleranceOperator.java |  18 +
 .../rocketmq/connect/runtime/rest/RestHandler.java |   2 +-
 .../service/AbstractConfigManagementService.java   |  80 +++
 .../service/ConfigManagementServiceImpl.java       |  43 +-
 .../service/PositionManagementServiceImpl.java     |   1 -
 .../memory/MemoryConfigManagementServiceImpl.java  |  49 +-
 .../runtime/store/PositionStorageWriter.java       | 156 ++++-
 .../connect/runtime/utils/ConnectUtil.java         |   6 +-
 .../connect/runtime/utils/ConnectorTaskId.java     |  83 +++
 .../connect/runtime/utils/CurrentTaskState.java    |  79 +++
 .../runtime/connectorwrapper/WorkerTest.java       |   7 +-
 .../connect/runtime/rest/RestHandlerTest.java      |   5 +-
 26 files changed, 2326 insertions(+), 1110 deletions(-)

diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index c4aed33..7617043 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -40,7 +40,7 @@
         <!-- Compiler settings properties -->
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
-        <rocketmq.version>4.5.2</rocketmq.version>
+        <rocketmq.version>4.7.0</rocketmq.version>
 
         <!--test jar-->
         <junit.version>4.13.1</junit.version>
@@ -285,11 +285,6 @@
                 <artifactId>maven-assembly-plugin</artifactId>
                 <version>3.0.0</version>
                 <configuration>
-                    <archive>
-                        <manifest>
-                            <mainClass>org.apache.rocketmq.connect.redis.connector.RedisSourceConnector</mainClass>
-                        </manifest>
-                    </archive>
                     <descriptorRefs>
                         <descriptorRef>jar-with-dependencies</descriptorRef>
                     </descriptorRefs>
diff --git a/pom.xml b/pom.xml
index cb8b222..64b719a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,12 +38,12 @@
         </license>
     </licenses>
     <properties>
-        <rocketmq.version>4.4.0</rocketmq.version>
+        <rocketmq.version>4.7.0</rocketmq.version>
         <junit.version>4.13.1</junit.version>
         <assertj.version>2.6.0</assertj.version>
         <mockito.version>3.2.4</mockito.version>
         <httpclient.version>4.5.13</httpclient.version>
-        <openmessaging.connector.version>0.1.3</openmessaging.connector.version>
+        <openmessaging.connector.version>0.1.4-SNAPSHOT</openmessaging.connector.version>
         <fastjson.version>1.2.83</fastjson.version>
         <javalin.version>2.8.0</javalin.version>
         <slf4j.version>1.7.7</slf4j.version>
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
index 45bf3ac..8710cbb 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
@@ -17,11 +17,12 @@
 
 package org.apache.rocketmq.connect.runtime.config;
 
-import java.io.File;
 import org.apache.rocketmq.common.MixAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+
 import static org.apache.rocketmq.connect.runtime.common.LoggerName.ROCKETMQ_RUNTIME;
 
 /**
@@ -69,6 +70,16 @@ public class ConnectConfig {
 
     private int rmqMinConsumeThreadNums = 1;
 
+    // task start timeout mills, default 3 minute
+    private long maxStartTimeoutMills = 1000 * 60 * 3;
+
+    // task stop timeout mills, default 1 minute
+    private long maxStopTimeoutMills = 1000 * 60;
+
+    // offset commit timeout
+    private long offsetCommitTimeoutMsConfig = 1000 * 30;
+
+
     public int getBrokerSuspendMaxTimeMillis() {
         return brokerSuspendMaxTimeMillis;
     }
@@ -137,6 +148,12 @@ public class ConnectConfig {
 
     private String adminExtGroup = "connector-admin-group";
 
+    /**
+     * offset commit interval ms
+     */
+    private long offsetCommitIntervalMs = 5000L;
+
+
     public String getWorkerId() {
         return workerId;
     }
@@ -369,38 +386,76 @@ public class ConnectConfig {
         this.connectHome = connectHome;
     }
 
-    @Override public String toString() {
+
+    public long getOffsetCommitIntervalMs() {
+        return offsetCommitIntervalMs;
+    }
+
+    public void setOffsetCommitIntervalMs(long offsetCommitIntervalMs) {
+        this.offsetCommitIntervalMs = offsetCommitIntervalMs;
+    }
+
+    public long getMaxStartTimeoutMills() {
+        return maxStartTimeoutMills;
+    }
+
+    public void setMaxStartTimeoutMills(long maxStartTimeoutMills) {
+        this.maxStartTimeoutMills = maxStartTimeoutMills;
+    }
+
+    public long getMaxStopTimeoutMills() {
+        return maxStopTimeoutMills;
+    }
+
+    public void setMaxStopTimeoutMills(long maxStopTimeoutMills) {
+        this.maxStopTimeoutMills = maxStopTimeoutMills;
+    }
+
+    public long getOffsetCommitTimeoutMsConfig() {
+        return offsetCommitTimeoutMsConfig;
+    }
+
+    public void setOffsetCommitTimeoutMsConfig(long offsetCommitTimeoutMsConfig) {
+        this.offsetCommitTimeoutMsConfig = offsetCommitTimeoutMsConfig;
+    }
+
+    @Override
+    public String toString() {
         return "ConnectConfig{" +
-            "workerId='" + workerId + '\'' +
-            ", storePathRootDir='" + storePathRootDir + '\'' +
-            ", connectHome='" + connectHome + '\'' +
-            ", namesrvAddr='" + namesrvAddr + '\'' +
-            ", rmqProducerGroup='" + rmqProducerGroup + '\'' +
-            ", maxMessageSize=" + maxMessageSize +
-            ", operationTimeout=" + operationTimeout +
-            ", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
-            ", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
-            ", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
-            ", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
-            ", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
-            ", brokerSuspendMaxTimeMillis=" + brokerSuspendMaxTimeMillis +
-            ", clusterStoreTopic='" + clusterStoreTopic + '\'' +
-            ", configStoreTopic='" + configStoreTopic + '\'' +
-            ", positionStoreTopic='" + positionStoreTopic + '\'' +
-            ", offsetStoreTopic='" + offsetStoreTopic + '\'' +
-            ", httpPort=" + httpPort +
-            ", positionPersistInterval=" + positionPersistInterval +
-            ", offsetPersistInterval=" + offsetPersistInterval +
-            ", configPersistInterval=" + configPersistInterval +
-            ", pluginPaths='" + pluginPaths + '\'' +
-            ", connectClusterId='" + connectClusterId + '\'' +
-            ", allocTaskStrategy='" + allocTaskStrategy + '\'' +
-            ", aclEnable=" + aclEnable +
-            ", accessKey='" + accessKey + '\'' +
-            ", secretKey='" + secretKey + '\'' +
-            ", autoCreateGroupEnable=" + autoCreateGroupEnable +
-            ", clusterName='" + clusterName + '\'' +
-            ", adminExtGroup='" + adminExtGroup + '\'' +
-            '}';
+                "workerId='" + workerId + '\'' +
+                ", storePathRootDir='" + storePathRootDir + '\'' +
+                ", connectHome='" + connectHome + '\'' +
+                ", namesrvAddr='" + namesrvAddr + '\'' +
+                ", rmqProducerGroup='" + rmqProducerGroup + '\'' +
+                ", maxMessageSize=" + maxMessageSize +
+                ", operationTimeout=" + operationTimeout +
+                ", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
+                ", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
+                ", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
+                ", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
+                ", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
+                ", maxStartTimeoutMills=" + maxStartTimeoutMills +
+                ", maxStopTimeoutMills=" + maxStopTimeoutMills +
+                ", offsetCommitTimeoutMsConfig=" + offsetCommitTimeoutMsConfig +
+                ", brokerSuspendMaxTimeMillis=" + brokerSuspendMaxTimeMillis +
+                ", clusterStoreTopic='" + clusterStoreTopic + '\'' +
+                ", configStoreTopic='" + configStoreTopic + '\'' +
+                ", positionStoreTopic='" + positionStoreTopic + '\'' +
+                ", offsetStoreTopic='" + offsetStoreTopic + '\'' +
+                ", httpPort=" + httpPort +
+                ", positionPersistInterval=" + positionPersistInterval +
+                ", offsetPersistInterval=" + offsetPersistInterval +
+                ", configPersistInterval=" + configPersistInterval +
+                ", pluginPaths='" + pluginPaths + '\'' +
+                ", connectClusterId='" + connectClusterId + '\'' +
+                ", allocTaskStrategy='" + allocTaskStrategy + '\'' +
+                ", aclEnable=" + aclEnable +
+                ", accessKey='" + accessKey + '\'' +
+                ", secretKey='" + secretKey + '\'' +
+                ", autoCreateGroupEnable=" + autoCreateGroupEnable +
+                ", clusterName='" + clusterName + '\'' +
+                ", adminExtGroup='" + adminExtGroup + '\'' +
+                ", offsetCommitIntervalMs=" + offsetCommitIntervalMs +
+                '}';
     }
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java
index f533093..b513078 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java
@@ -29,8 +29,6 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 
 public class SinkConnectorConfig extends ConnectConfig {
 
-
-
     public static Set<String> parseTopicList(ConnectKeyValue taskConfig) {
         String messageQueueStr = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
         if (StringUtils.isBlank(messageQueueStr)) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagement.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagement.java
new file mode 100644
index 0000000..c17ade1
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagement.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.RecordPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * record offset management
+ */
+class RecordOffsetManagement {
+
+    private static final Logger log = LoggerFactory.getLogger(RecordOffsetManagement.class);
+
+    final Map<RecordPartition, Deque<SubmittedPosition>> records = new HashMap<>();
+    private AtomicInteger numUnackedMessages = new AtomicInteger(0);
+    private CountDownLatch messageDrainLatch;
+
+    public RecordOffsetManagement() {
+    }
+
+    /**
+     * submit record
+     *
+     * @param position
+     * @return
+     */
+    public SubmittedPosition submitRecord(RecordPosition position) {
+        SubmittedPosition submittedPosition = new SubmittedPosition(position);
+        records.computeIfAbsent(position.getPartition(), e -> new LinkedList<>()).add(submittedPosition);
+        // ensure thread safety in operation
+        synchronized (this) {
+            numUnackedMessages.incrementAndGet();
+        }
+        return submittedPosition;
+    }
+
+    /**
+     * await all messages
+     *
+     * @param timeout
+     * @param timeUnit
+     * @return
+     */
+    public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
+        // Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization
+        // on an instance variable when invoking CountDownLatch::await outside a synchronized block
+        CountDownLatch messageDrainLatch;
+        synchronized (this) {
+            messageDrainLatch = new CountDownLatch(numUnackedMessages.get());
+            this.messageDrainLatch = messageDrainLatch;
+        }
+        try {
+            return messageDrainLatch.await(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            return false;
+        }
+    }
+
+    /**
+     * @param submittedPositions
+     * @return
+     */
+    private RecordOffset pollOffsetWhile(Deque<SubmittedPosition> submittedPositions) {
+        RecordOffset offset = null;
+        // Stop pulling if there is an uncommitted breakpoint
+        while (canCommitHead(submittedPositions)) {
+            offset = submittedPositions.poll().getPosition().getOffset();
+        }
+        return offset;
+    }
+
+    private boolean canCommitHead(Deque<SubmittedPosition> submittedPositions) {
+        return submittedPositions.peek() != null && submittedPositions.peek().getAcked();
+    }
+
+    public CommittableOffsets committableOffsets() {
+        Map<RecordPartition, RecordOffset> offsets = new HashMap<>();
+        int totalCommittableMessages = 0;
+        int totalUncommittableMessages = 0;
+        int largestDequeSize = 0;
+        RecordPartition largestDequePartition = null;
+        for (Map.Entry<RecordPartition, Deque<SubmittedPosition>> entry : records.entrySet()) {
+            RecordPartition partition = entry.getKey();
+            Deque<SubmittedPosition> queuedRecords = entry.getValue();
+            int initialDequeSize = queuedRecords.size();
+            if (canCommitHead(queuedRecords)) {
+                RecordOffset offset = pollOffsetWhile(queuedRecords);
+                offsets.put(partition, offset);
+            }
+            // uncommited messages
+            int uncommittableMessages = queuedRecords.size();
+            // committed messages
+            int committableMessages = initialDequeSize - uncommittableMessages;
+
+            // calc total
+            totalCommittableMessages += committableMessages;
+            totalUncommittableMessages += uncommittableMessages;
+
+            if (uncommittableMessages > largestDequeSize) {
+                largestDequeSize = uncommittableMessages;
+                largestDequePartition = partition;
+            }
+        }
+        // Clear out all empty deques from the map to keep it from growing indefinitely
+        records.values().removeIf(Deque::isEmpty);
+        return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition);
+    }
+
+
+    // Synchronize in order to ensure that the number of unacknowledged messages isn't modified in the middle of a call
+    // to awaitAllMessages (which might cause us to decrement first, then create a new message drain latch, then count down
+    // that latch here, effectively double-acking the message)
+    private synchronized void messageAcked() {
+        numUnackedMessages.decrementAndGet();
+        if (messageDrainLatch != null) {
+            messageDrainLatch.countDown();
+        }
+    }
+
+    public class SubmittedPosition {
+        private final RecordPosition position;
+        private final AtomicBoolean acked;
+
+        public SubmittedPosition(RecordPosition position) {
+            this.position = position;
+            acked = new AtomicBoolean(false);
+        }
+
+
+        /**
+         * Acknowledge this record; signals that its offset may be safely committed.
+         */
+        public void ack() {
+            if (this.acked.compareAndSet(false, true)) {
+                messageAcked();
+            }
+        }
+
+        /**
+         * remove record
+         *
+         * @return
+         */
+        public boolean remove() {
+            Deque<SubmittedPosition> deque = records.get(position.getPartition());
+            if (deque == null) {
+                return false;
+            }
+            boolean result = deque.removeLastOccurrence(this);
+            if (deque.isEmpty()) {
+                records.remove(position.getPartition());
+            }
+            if (result) {
+                messageAcked();
+            } else {
+                log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", position.getPartition());
+            }
+            return result;
+        }
+
+        public RecordPosition getPosition() {
+            return position;
+        }
+
+        public Boolean getAcked() {
+            return acked.get();
+        }
+    }
+
+
+    /**
+     * Contains a snapshot of offsets that can be committed for a source task and metadata for that offset commit
+     * (such as the number of messages for which offsets can and cannot be committed).
+     */
+    static class CommittableOffsets {
+
+        /**
+         * An "empty" snapshot that contains no offsets to commit and whose metadata contains no committable or uncommitable messages.
+         */
+        public static final CommittableOffsets EMPTY = new CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null);
+
+        private final Map<RecordPartition, RecordOffset> offsets;
+        private final RecordPartition largestDequePartition;
+        private final int numCommittableMessages;
+        private final int numUncommittableMessages;
+        private final int numDeques;
+        private final int largestDequeSize;
+
+
+        CommittableOffsets(
+                Map<RecordPartition, RecordOffset> offsets,
+                int numCommittableMessages,
+                int numUncommittableMessages,
+                int numDeques,
+                int largestDequeSize,
+                RecordPartition largestDequePartition
+        ) {
+            this.offsets = offsets != null ? new HashMap<>(offsets) : Collections.emptyMap();
+            this.numCommittableMessages = numCommittableMessages;
+            this.numUncommittableMessages = numUncommittableMessages;
+            this.numDeques = numDeques;
+            this.largestDequeSize = largestDequeSize;
+            this.largestDequePartition = largestDequePartition;
+        }
+
+        public Map<RecordPartition, RecordOffset> offsets() {
+            return Collections.unmodifiableMap(offsets);
+        }
+
+        public int numCommittableMessages() {
+            return numCommittableMessages;
+        }
+
+        public int numUncommittableMessages() {
+            return numUncommittableMessages;
+        }
+
+        public int numDeques() {
+            return numDeques;
+        }
+
+        public int largestDequeSize() {
+            return largestDequeSize;
+        }
+
+        public RecordPartition largestDequePartition() {
+            return largestDequePartition;
+        }
+
+        public boolean hasPending() {
+            return numUncommittableMessages > 0;
+        }
+
+
+        public boolean isEmpty() {
+            return numCommittableMessages == 0 && numUncommittableMessages == 0 && offsets.isEmpty();
+        }
+
+
+        public CommittableOffsets updatedWith(CommittableOffsets newerOffsets) {
+            Map<RecordPartition, RecordOffset> offsets = new HashMap<>(this.offsets);
+            offsets.putAll(newerOffsets.offsets);
+
+            return new CommittableOffsets(
+                    offsets,
+                    this.numCommittableMessages + newerOffsets.numCommittableMessages,
+                    newerOffsets.numUncommittableMessages,
+                    newerOffsets.numDeques,
+                    newerOffsets.largestDequeSize,
+                    newerOffsets.largestDequePartition
+            );
+        }
+    }
+
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitter.java
new file mode 100644
index 0000000..9f1491c
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * source task offset committer
+ */
+class SourceTaskOffsetCommitter {
+    private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
+
+    private final ConnectConfig config;
+    private final ScheduledExecutorService commitExecutorService;
+    private final ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers;
+
+
+    SourceTaskOffsetCommitter(ConnectConfig config,
+                              ScheduledExecutorService commitExecutorService,
+                              ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers) {
+        this.config = config;
+        this.commitExecutorService = commitExecutorService;
+        this.committers = committers;
+    }
+
+    public SourceTaskOffsetCommitter(ConnectConfig config) {
+        this(config, Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory(
+                SourceTaskOffsetCommitter.class.getSimpleName() + "-%d", false)),
+                new ConcurrentHashMap<>());
+    }
+
+    public void close(long timeoutMs) {
+        commitExecutorService.shutdown();
+        try {
+            if (!commitExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
+                log.error("Graceful shutdown of offset commitOffsets thread timed out.");
+            }
+        } catch (InterruptedException e) {
+            // ignore and allow to exit immediately
+        }
+    }
+
+    public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
+        long commitIntervalMs = config.getOffsetCommitIntervalMs();
+        ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(() -> {
+            commit(workerTask);
+        }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
+        committers.put(id, commitFuture);
+    }
+
+    public void remove(ConnectorTaskId id) {
+        final ScheduledFuture<?> task = committers.remove(id);
+        if (task == null) {
+            return;
+        }
+
+        try {
+            task.cancel(false);
+            if (!task.isDone()) {
+                task.get();
+            }
+        } catch (CancellationException e) {
+            // ignore
+            log.trace("Offset commit thread was cancelled by another thread while removing connector task with id: {}", id);
+        } catch (ExecutionException | InterruptedException e) {
+            throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter while removing task with id: " + id, e);
+        }
+    }
+
+    private void commit(WorkerSourceTask workerTask) {
+        log.debug("{} Committing offsets", workerTask);
+        try {
+            if (workerTask.commitOffsets()) {
+                return;
+            }
+            log.error("{} Failed to commit offsets", workerTask);
+        } catch (Throwable t) {
+            // We're very careful about exceptions here since any uncaught exceptions in the commit
+            // thread would cause the fixed interval schedule on the ExecutorService to stop running
+            // for that task
+            log.error("{} Unhandled exception when committing: ", workerTask, t);
+        }
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 70bb7ad..dec3e88 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -25,47 +25,49 @@ import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
 import io.openmessaging.connector.api.data.RecordConverter;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
 import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
 import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import org.apache.rocketmq.connect.runtime.service.TaskPositionCommitService;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
 import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * A worker to schedule all connectors and tasks in a process.
  */
@@ -78,80 +80,60 @@ public class Worker {
     private Set<WorkerConnector> workingConnectors = new ConcurrentSet<>();
 
     /**
-     * Current running tasks.
+     * Current tasks state.
      */
     private Map<Runnable, Long/*timestamp*/> pendingTasks = new ConcurrentHashMap<>();
-
     private Set<Runnable> runningTasks = new ConcurrentSet<>();
-
-    private Set<Runnable> errorTasks = new ConcurrentSet<>();
-
-    private Set<Runnable> cleanedErrorTasks = new ConcurrentSet<>();
-
     private Map<Runnable, Long/*timestamp*/> stoppingTasks = new ConcurrentHashMap<>();
-
     private Set<Runnable> stoppedTasks = new ConcurrentSet<>();
-
     private Set<Runnable> cleanedStoppedTasks = new ConcurrentSet<>();
+    private Set<Runnable> errorTasks = new ConcurrentSet<>();
+    private Set<Runnable> cleanedErrorTasks = new ConcurrentSet<>();
 
     Map<String, List<ConnectKeyValue>> latestTaskConfigs = new HashMap<>();
     /**
      * Current running tasks to its Future map.
      */
     private Map<Runnable, Future> taskToFutureMap = new ConcurrentHashMap<>();
-
     /**
      * Thread pool for connectors and tasks.
      */
     private final ExecutorService taskExecutor;
-
     /**
      * Position management for source tasks.
      */
     private final PositionManagementService positionManagementService;
-
     /**
      * A scheduled task to commit source position of source tasks.
      */
-    private final TaskPositionCommitService taskPositionCommitService;
-
-    private final ConnectConfig connectConfig;
-
+//    private final TaskPositionCommitService taskPositionCommitService;
+    private Optional<SourceTaskOffsetCommitter> sourceTaskOffsetCommitter;
+    private final ConnectConfig workerConfig;
     private final Plugin plugin;
 
-    private static final int MAX_START_TIMEOUT_MILLS = 1000 * 60;
-
-    private static final long MAX_STOP_TIMEOUT_MILLS = 20000;
-
     /**
      * Atomic state variable
      */
     private AtomicReference<WorkerState> workerState;
-
     private StateMachineService stateMachineService = new StateMachineService();
-
     private final ConnectStatsManager connectStatsManager;
-
     private final ConnectStatsService connectStatsService;
 
-    public Worker(ConnectConfig connectConfig,
+    public Worker(ConnectConfig workerConfig,
                   PositionManagementService positionManagementService,
                   ConfigManagementService configManagementService,
                   Plugin plugin, AbstractConnectController connectController) {
-        this.connectConfig = connectConfig;
+        this.workerConfig = workerConfig;
         this.taskExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("task-Worker-Executor-"));
         this.positionManagementService = positionManagementService;
-        this.taskPositionCommitService = new TaskPositionCommitService(
-            this,
-            positionManagementService);
         this.plugin = plugin;
         this.connectStatsManager = connectController.getConnectStatsManager();
         this.connectStatsService = connectController.getConnectStatsService();
+        this.sourceTaskOffsetCommitter = Optional.of(new SourceTaskOffsetCommitter(workerConfig));
     }
 
     public void start() {
         workerState = new AtomicReference<>(WorkerState.STARTED);
-        taskPositionCommitService.start();
         stateMachineService.start();
     }
 
@@ -164,7 +146,7 @@ public class Worker {
      * @throws Exception
      */
     public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorConfigs,
-        AbstractConnectController connectController) throws Exception {
+                                             AbstractConnectController connectController) throws Exception {
         Set<WorkerConnector> stoppedConnector = new HashSet<>();
         for (WorkerConnector workerConnector : workingConnectors) {
             try {
@@ -247,11 +229,11 @@ public class Worker {
         for (Runnable runnable : set) {
             ConnectKeyValue taskConfig = null;
             if (runnable instanceof WorkerSourceTask) {
-                taskConfig = ((WorkerSourceTask) runnable).getTaskConfig();
+                taskConfig = ((WorkerSourceTask) runnable).currentTaskConfig();
             } else if (runnable instanceof WorkerSinkTask) {
-                taskConfig = ((WorkerSinkTask) runnable).getTaskConfig();
+                taskConfig = ((WorkerSinkTask) runnable).currentTaskConfig();
             } else if (runnable instanceof WorkerDirectTask) {
-                taskConfig = ((WorkerDirectTask) runnable).getTaskConfig();
+                taskConfig = ((WorkerDirectTask) runnable).currentTaskConfig();
             }
             if (keyValue.equals(taskConfig)) {
                 return true;
@@ -267,19 +249,22 @@ public class Worker {
     public void stop() {
         workerState.set(WorkerState.TERMINATED);
         try {
+            sourceTaskOffsetCommitter.ifPresent(committer -> committer.close(5000));
             taskExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             log.error("Task termination error.", e);
         }
         stateMachineService.shutdown();
+
     }
 
+
     public Set<WorkerConnector> getWorkingConnectors() {
         return workingConnectors;
     }
 
     public void setWorkingConnectors(
-        Set<WorkerConnector> workingConnectors) {
+            Set<WorkerConnector> workingConnectors) {
         this.workingConnectors = workingConnectors;
     }
 
@@ -326,83 +311,246 @@ public class Worker {
 
     }
 
+    /**
+     * maintain task state
+     *
+     * @throws Exception
+     */
     public void maintainTaskState() throws Exception {
-
-        Map<String, List<ConnectKeyValue>> taskConfigs = new HashMap<>();
+        Map<String, List<ConnectKeyValue>> connectorConfig = new HashMap<>();
         synchronized (latestTaskConfigs) {
-            taskConfigs.putAll(latestTaskConfigs);
+            connectorConfig.putAll(latestTaskConfigs);
         }
 
-        boolean needCommitPosition = false;
+        //  STEP 1: check running tasks and put to error status
+        checkRunningTasks(connectorConfig);
+
+        // get new Tasks
+        Map<String, List<ConnectKeyValue>> newTasks = newTasks(connectorConfig);
+
+        //  STEP 2: try to create new tasks
+        startTask(newTasks);
+
+        //  STEP 3: check all pending state
+        checkPendingTask();
+
+        //  STEP 4 check stopping tasks
+        checkStoppingTasks();
+
+        //  STEP 5 check error tasks
+        checkErrorTasks();
+
+        //  STEP 6 check errorTasks and stopped tasks
+        checkStoppedTasks();
+    }
+
+    /**
+     * check running task
+     *
+     * @param connectorConfig
+     */
+    private void checkRunningTasks(Map<String, List<ConnectKeyValue>> connectorConfig) {
         //  STEP 1: check running tasks and put to error status
         for (Runnable runnable : runningTasks) {
             WorkerTask workerTask = (WorkerTask) runnable;
-            String connectorName = workerTask.getConnectorName();
-            ConnectKeyValue taskConfig = workerTask.getTaskConfig();
-            List<ConnectKeyValue> keyValues = taskConfigs.get(connectorName);
+            String connectorName = workerTask.id().connector();
+            ConnectKeyValue taskConfig = workerTask.currentTaskConfig();
+            List<ConnectKeyValue> taskConfigs = connectorConfig.get(connectorName);
             WorkerTaskState state = ((WorkerTask) runnable).getState();
-
-            if (WorkerTaskState.ERROR == state) {
-                errorTasks.add(runnable);
-                runningTasks.remove(runnable);
-            } else if (WorkerTaskState.RUNNING == state) {
-                boolean needStop = true;
-                if (null != keyValues && keyValues.size() > 0) {
-                    for (ConnectKeyValue keyValue : keyValues) {
-                        if (keyValue.equals(taskConfig)) {
-                            needStop = false;
-                            break;
+            switch (state) {
+                case ERROR:
+                    errorTasks.add(runnable);
+                    runningTasks.remove(runnable);
+                    break;
+                case RUNNING:
+                    if (isNeedStop(taskConfig, taskConfigs)) {
+                        try {
+                            // remove committer offset
+                            sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
+                            workerTask.doClose();
+                        } catch (Exception e) {
+                            log.error("workerTask stop exception, workerTask: " + workerTask.currentTaskConfig(), e);
                         }
+                        log.info("Task stopping, connector name {}, config {}", workerTask.id().connector(), workerTask.currentTaskConfig());
+                        runningTasks.remove(runnable);
+                        stoppingTasks.put(runnable, System.currentTimeMillis());
                     }
+                    break;
+                default:
+                    log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
+                            ((WorkerTask) runnable).id().connector(), state);
+                    break;
+            }
+        }
+    }
+
+    /**
+     * check is need stop
+     *
+     * @param taskConfig
+     * @param keyValues
+     * @return
+     */
+    private boolean isNeedStop(ConnectKeyValue taskConfig, List<ConnectKeyValue> keyValues) {
+        if (CollectionUtils.isEmpty(keyValues)) {
+            return true;
+        }
+        for (ConnectKeyValue keyValue : keyValues) {
+            if (keyValue.equals(taskConfig)) {
+                // not stop
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * check stopped tasks
+     */
+    private void checkStoppedTasks() {
+        for (Runnable runnable : stoppedTasks) {
+            WorkerTask workerTask = (WorkerTask) runnable;
+            workerTask.cleanup();
+            Future future = taskToFutureMap.get(runnable);
+            try {
+                if (null != future) {
+                    future.get(workerConfig.getMaxStartTimeoutMills(), TimeUnit.MILLISECONDS);
+                } else {
+                    log.error("[BUG] stopped Tasks reference not found in taskFutureMap");
                 }
+            } catch (ExecutionException e) {
+                Throwable t = e.getCause();
+                log.info("[BUG] Stopped Tasks should not throw any exception");
+                t.printStackTrace();
+            } catch (CancellationException e) {
+                log.info("[BUG] Stopped Tasks throws PrintStackTrace");
+                e.printStackTrace();
+            } catch (TimeoutException e) {
+                log.info("[BUG] Stopped Tasks should not throw any exception");
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                log.info("[BUG] Stopped Tasks should not throw any exception");
+                e.printStackTrace();
+            } finally {
+                // remove committer offset
+                sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
+                future.cancel(true);
+                taskToFutureMap.remove(runnable);
+                stoppedTasks.remove(runnable);
+                cleanedStoppedTasks.add(runnable);
+            }
+        }
+    }
 
-                if (needStop) {
-                    try {
-                        workerTask.stop();
-                    } catch (Exception e) {
-                        log.error("workerTask stop exception, workerTask: " + workerTask.getTaskConfig(), e);
-                    }
-                    log.info("Task stopping, connector name {}, config {}", workerTask.getConnectorName(), workerTask.getTaskConfig());
-                    runningTasks.remove(runnable);
-                    stoppingTasks.put(runnable, System.currentTimeMillis());
-                    needCommitPosition = true;
+    private void checkErrorTasks() {
+        for (Runnable runnable : errorTasks) {
+            WorkerTask workerTask = (WorkerTask) runnable;
+            Future future = taskToFutureMap.get(runnable);
+            try {
+                if (null != future) {
+                    future.get(workerConfig.getMaxStopTimeoutMills(), TimeUnit.MILLISECONDS);
+                } else {
+                    log.error("[BUG] errorTasks reference not found in taskFutureMap");
                 }
-            } else {
-                log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
-                    ((WorkerTask) runnable).getConnectorName(), state.toString());
+            } catch (ExecutionException e) {
+                log.error("Execution exception , {}", e);
+            } catch (CancellationException | TimeoutException | InterruptedException e) {
+                log.error("error, {}", e);
+            } finally {
+                // remove committer offset
+                sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
+
+                future.cancel(true);
+                workerTask.cleanup();
+                taskToFutureMap.remove(runnable);
+                errorTasks.remove(runnable);
+                cleanedErrorTasks.add(runnable);
             }
         }
+    }
 
-        //If some tasks are closed, synchronize the position.
-        if (needCommitPosition) {
-            taskPositionCommitService.commitTaskPosition();
+    private void checkStoppingTasks() {
+        for (Map.Entry<Runnable, Long> entry : stoppingTasks.entrySet()) {
+            Runnable runnable = entry.getKey();
+            Long stopTimestamp = entry.getValue();
+            Long currentTimeMillis = System.currentTimeMillis();
+            Future future = taskToFutureMap.get(runnable);
+            WorkerTaskState state = ((WorkerTask) runnable).getState();
+            // exited normally
+            switch (state) {
+                case STOPPED:
+                    // concurrent modification Exception ? Will it pop that in the
+                    if (null == future || !future.isDone()) {
+                        log.error("[BUG] future is null or Stopped task should have its Future.isDone() true, but false");
+                    }
+                    stoppingTasks.remove(runnable);
+                    stoppedTasks.add(runnable);
+                    break;
+                case ERROR:
+                    stoppingTasks.remove(runnable);
+                    errorTasks.add(runnable);
+                    break;
+                case STOPPING:
+                    if (currentTimeMillis - stopTimestamp > workerConfig.getMaxStopTimeoutMills()) {
+                        ((WorkerTask) runnable).timeout();
+                        stoppingTasks.remove(runnable);
+                        errorTasks.add(runnable);
+                    }
+                    break;
+                default:
+                    log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
+                            ((WorkerTask) runnable).id().connector(), state.toString());
+            }
         }
+    }
 
-        // get new Tasks
-        Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
-        for (String connectorName : taskConfigs.keySet()) {
-            for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) {
-                boolean isNewTask = true;
-                if (isConfigInSet(keyValue, runningTasks) || isConfigInSet(keyValue, pendingTasks.keySet()) || isConfigInSet(keyValue, errorTasks)) {
-                    isNewTask = false;
-                }
-                if (isNewTask) {
-                    if (!newTasks.containsKey(connectorName)) {
-                        newTasks.put(connectorName, new ArrayList<>());
+    private void checkPendingTask() {
+        for (Map.Entry<Runnable, Long> entry : pendingTasks.entrySet()) {
+            Runnable runnable = entry.getKey();
+            Long startTimestamp = entry.getValue();
+            Long currentTimeMillis = System.currentTimeMillis();
+            WorkerTaskState state = ((WorkerTask) runnable).getState();
+            switch (state) {
+                case ERROR:
+                    errorTasks.add(runnable);
+                    pendingTasks.remove(runnable);
+                    break;
+                case RUNNING:
+                    runningTasks.add(runnable);
+                    pendingTasks.remove(runnable);
+                    break;
+                case NEW:
+                    log.info("[RACE CONDITION] we checked the pending tasks before state turns to PENDING");
+                    break;
+                case PENDING:
+                    if (currentTimeMillis - startTimestamp > workerConfig.getMaxStartTimeoutMills()) {
+                        ((WorkerTask) runnable).timeout();
+                        pendingTasks.remove(runnable);
+                        errorTasks.add(runnable);
                     }
-                    log.info("Add new tasks,connector name {}, config {}", connectorName, keyValue);
-                    newTasks.get(connectorName).add(keyValue);
-                }
+                    break;
+                default:
+                    log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state",
+                            ((WorkerTask) runnable).id().connector(), state.toString());
+                    break;
             }
         }
+    }
 
-        //  STEP 2: try to create new tasks
-        int taskId = 0;
+    /**
+     * start task
+     *
+     * @param newTasks
+     * @throws Exception
+     */
+    private void startTask(Map<String, List<ConnectKeyValue>> newTasks) throws Exception {
         for (String connectorName : newTasks.keySet()) {
             for (ConnectKeyValue keyValue : newTasks.get(connectorName)) {
+                int taskId = keyValue.getInt(RuntimeConfigDefine.TASK_ID);
+                ConnectorTaskId id = new ConnectorTaskId(connectorName, taskId);
                 String taskType = keyValue.getString(RuntimeConfigDefine.TASK_TYPE);
                 if (TaskType.DIRECT.name().equalsIgnoreCase(taskType)) {
-                    createDirectTask(connectorName, keyValue);
+                    createDirectTask(id, keyValue);
                     continue;
                 }
                 try {
@@ -422,7 +570,7 @@ public class Worker {
                     RecordConverter recordConverter = null;
                     if (StringUtils.isNotEmpty(converterClazzName)) {
                         recordConverter = Class.forName(converterClazzName)
-                                .asSubclass(io.openmessaging.connector.api.data.RecordConverter.class)
+                                .asSubclass(RecordConverter.class)
                                 .getDeclaredConstructor()
                                 .newInstance();
                         recordConverter.configure(keyValue.getProperties());
@@ -432,36 +580,36 @@ public class Worker {
                         Plugin.compareAndSwapLoaders(loader);
                     }
                     if (task instanceof SourceTask) {
-
-                        DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(connectConfig);
+                        DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
                         TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
                         // create retry operator
                         RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
                         retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));
 
-                        WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
-                            (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
+                        WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
+                                (SourceTask) task, loader, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
                         Plugin.compareAndSwapLoaders(currentThreadLoader);
-
                         Future future = taskExecutor.submit(workerSourceTask);
+                        // schedule offset committer
+                        sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerSourceTask));
+
                         taskToFutureMap.put(workerSourceTask, future);
                         this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
+
                     } else if (task instanceof SinkTask) {
                         log.info("sink task config keyValue is {}", keyValue.getProperties());
-                        DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue, ++taskId);
-                        Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+                        DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(workerConfig, id, keyValue);
+                        Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(workerConfig);
                         if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
-                            ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+                            ConnectUtil.createSubGroup(workerConfig, consumer.getConsumerGroup());
                         }
                         TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
-
                         // create retry operator
                         RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
-                        retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, connectConfig));
+                        retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, workerConfig));
 
-
-                        WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
-                            (SinkTask) task, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
+                        WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
+                                (SinkTask) task, loader, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
                                 retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, recordConverter));
                         Plugin.compareAndSwapLoaders(currentThreadLoader);
                         Future future = taskExecutor.submit(workerSinkTask);
@@ -473,137 +621,59 @@ public class Worker {
                 }
             }
         }
+    }
 
-        //  STEP 3: check all pending state
-        for (Map.Entry<Runnable, Long> entry : pendingTasks.entrySet()) {
-            Runnable runnable = entry.getKey();
-            Long startTimestamp = entry.getValue();
-            Long currentTimeMillis = System.currentTimeMillis();
-            WorkerTaskState state = ((WorkerTask) runnable).getState();
-
-            if (WorkerTaskState.ERROR == state) {
-                errorTasks.add(runnable);
-                pendingTasks.remove(runnable);
-            } else if (WorkerTaskState.RUNNING == state) {
-                runningTasks.add(runnable);
-                pendingTasks.remove(runnable);
-            } else if (WorkerTaskState.NEW == state) {
-                log.info("[RACE CONDITION] we checked the pending tasks before state turns to PENDING");
-            } else if (WorkerTaskState.PENDING == state) {
-                if (currentTimeMillis - startTimestamp > MAX_START_TIMEOUT_MILLS) {
-                    ((WorkerTask) runnable).timeout();
-                    pendingTasks.remove(runnable);
-                    errorTasks.add(runnable);
-                }
-            } else {
-                log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state",
-                    ((WorkerTask) runnable).getConnectorName(), state.toString());
-            }
-        }
-
-        //  STEP 4 check stopping tasks
-        for (Map.Entry<Runnable, Long> entry : stoppingTasks.entrySet()) {
-            Runnable runnable = entry.getKey();
-            Long stopTimestamp = entry.getValue();
-            Long currentTimeMillis = System.currentTimeMillis();
-            Future future = taskToFutureMap.get(runnable);
-            WorkerTaskState state = ((WorkerTask) runnable).getState();
-            // exited normally
-
-            if (WorkerTaskState.STOPPED == state) {
-                // concurrent modification Exception ? Will it pop that in the
-
-                if (null == future || !future.isDone()) {
-                    log.error("[BUG] future is null or Stopped task should have its Future.isDone() true, but false");
-                }
-                stoppingTasks.remove(runnable);
-                stoppedTasks.add(runnable);
-            } else if (WorkerTaskState.ERROR == state) {
-                stoppingTasks.remove(runnable);
-                errorTasks.add(runnable);
-            } else if (WorkerTaskState.STOPPING == state) {
-                if (currentTimeMillis - stopTimestamp > MAX_STOP_TIMEOUT_MILLS) {
-                    ((WorkerTask) runnable).timeout();
-                    stoppingTasks.remove(runnable);
-                    errorTasks.add(runnable);
-                }
-            } else {
-
-                log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
-                    ((WorkerTask) runnable).getConnectorName(), state.toString());
-            }
-        }
-
-        //  STEP 5 check errorTasks and stopped tasks
-        for (Runnable runnable : errorTasks) {
-            WorkerTask workerTask = (WorkerTask) runnable;
-            Future future = taskToFutureMap.get(runnable);
-
-            try {
-                if (null != future) {
-                    future.get(1000 * 60, TimeUnit.MILLISECONDS);
-                } else {
-                    log.error("[BUG] errorTasks reference not found in taskFutureMap");
+    private Map<String, List<ConnectKeyValue>> newTasks(Map<String, List<ConnectKeyValue>> taskConfigs) {
+        Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
+        for (String connectorName : taskConfigs.keySet()) {
+            for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) {
+                boolean isNewTask = true;
+                if (isConfigInSet(keyValue, runningTasks) || isConfigInSet(keyValue, pendingTasks.keySet()) || isConfigInSet(keyValue, errorTasks)) {
+                    isNewTask = false;
                 }
-            } catch (ExecutionException e) {
-                Throwable t = e.getCause();
-                log.error("Execution exception , {}", e);
-            } catch (CancellationException | TimeoutException | InterruptedException e) {
-                log.error("error, {}", e);
-            } finally {
-                future.cancel(true);
-                workerTask.cleanup();
-                taskToFutureMap.remove(runnable);
-                errorTasks.remove(runnable);
-                cleanedErrorTasks.add(runnable);
-
-            }
-        }
-
-        //  STEP 5 check errorTasks and stopped tasks
-        for (Runnable runnable : stoppedTasks) {
-            WorkerTask workerTask = (WorkerTask) runnable;
-            workerTask.cleanup();
-            Future future = taskToFutureMap.get(runnable);
-            try {
-                if (null != future) {
-                    future.get(1000, TimeUnit.MILLISECONDS);
-                } else {
-                    log.error("[BUG] stopped Tasks reference not found in taskFutureMap");
+                if (isNewTask) {
+                    if (!newTasks.containsKey(connectorName)) {
+                        newTasks.put(connectorName, new ArrayList<>());
+                    }
+                    log.info("Add new tasks,connector name {}, config {}", connectorName, keyValue);
+                    newTasks.get(connectorName).add(keyValue);
                 }
-            } catch (ExecutionException e) {
-                Throwable t = e.getCause();
-                log.info("[BUG] Stopped Tasks should not throw any exception");
-                t.printStackTrace();
-            } catch (CancellationException e) {
-                log.info("[BUG] Stopped Tasks throws PrintStackTrace");
-                e.printStackTrace();
-            } catch (TimeoutException e) {
-                log.info("[BUG] Stopped Tasks should not throw any exception");
-                e.printStackTrace();
-            } catch (InterruptedException e) {
-                log.info("[BUG] Stopped Tasks should not throw any exception");
-                e.printStackTrace();
-            } finally {
-                future.cancel(true);
-                taskToFutureMap.remove(runnable);
-                stoppedTasks.remove(runnable);
-                cleanedStoppedTasks.add(runnable);
             }
         }
+        return newTasks;
     }
 
-    private void createDirectTask(String connectorName, ConnectKeyValue keyValue) throws Exception {
+    private void createDirectTask(ConnectorTaskId id, ConnectKeyValue keyValue) throws Exception {
         String sourceTaskClass = keyValue.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS);
         Task sourceTask = getTask(sourceTaskClass);
 
         String sinkTaskClass = keyValue.getString(RuntimeConfigDefine.SINK_TASK_CLASS);
         Task sinkTask = getTask(sinkTaskClass);
 
-        WorkerDirectTask workerDirectTask = new WorkerDirectTask(connectorName,
-            (SourceTask) sourceTask, (SinkTask) sinkTask, keyValue, positionManagementService, workerState);
+        TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+        // create retry operator
+        RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
+        retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(id.connector(), keyValue));
+
+        WorkerDirectTask workerDirectTask = new WorkerDirectTask(
+                workerConfig,
+                id,
+                (SourceTask) sourceTask,
+                null,
+                (SinkTask) sinkTask,
+                keyValue,
+                positionManagementService,
+                workerState,
+                connectStatsManager,
+                connectStatsService,
+                transformChain,
+                retryWithToleranceOperator);
 
         Future future = taskExecutor.submit(workerDirectTask);
+
+        // schedule offset committer
+        sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerDirectTask));
+
         taskToFutureMap.put(workerDirectTask, future);
         this.pendingTasks.put(workerDirectTask, System.currentTimeMillis());
     }
@@ -632,7 +702,6 @@ public class Worker {
         @Override
         public void run() {
             log.info(this.getServiceName() + " service started");
-
             while (!this.isStopped()) {
                 this.waitForRunning(1000);
                 try {
@@ -642,7 +711,6 @@ public class Worker {
                     log.error("RebalanceImpl#StateMachineService start connector or task failed", e);
                 }
             }
-
             log.info(this.getServiceName() + " service end");
         }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 3a56d72..ce56766 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -27,35 +27,32 @@ import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.storage.OffsetStorageReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.collections.MapUtils;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
-import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * A wrapper of {@link SinkTask} and {@link SourceTask} for runtime.
  */
-public class WorkerDirectTask implements WorkerTask {
+public class WorkerDirectTask extends WorkerSourceTask {
 
     private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
-    /**
-     * Connector name of current task.
-     */
-    private String connectorName;
-
     /**
      * The implements of the source task.
      */
@@ -65,142 +62,72 @@ public class WorkerDirectTask implements WorkerTask {
      * The implements of the sink task.
      */
     private SinkTask sinkTask;
-
-    /**
-     * The configs of current sink task.
-     */
-    private ConnectKeyValue taskConfig;
-
-    /**
-     * Atomic state variable
-     */
-    private AtomicReference<WorkerTaskState> state;
-
-    private final PositionManagementService positionManagementService;
-
     private final OffsetStorageReader positionStorageReader;
 
-    private final PositionStorageWriter positionStorageWriter;
-
-    private final AtomicReference<WorkerState> workerState;
-
-    public WorkerDirectTask(String connectorName,
-        SourceTask sourceTask,
-        SinkTask sinkTask,
-        ConnectKeyValue taskConfig,
-        PositionManagementService positionManagementService,
-        AtomicReference<WorkerState> workerState) {
-        this.connectorName = connectorName;
+    public WorkerDirectTask(ConnectConfig workerConfig,
+                            ConnectorTaskId id,
+                            SourceTask sourceTask,
+                            ClassLoader classLoader,
+                            SinkTask sinkTask,
+                            ConnectKeyValue taskConfig,
+                            PositionManagementService positionManagementService,
+                            AtomicReference<WorkerState> workerState,
+                            ConnectStatsManager connectStatsManager,
+                            ConnectStatsService connectStatsService,
+                            TransformChain<ConnectRecord> transformChain,
+                            RetryWithToleranceOperator retryWithToleranceOperator) {
+        super(workerConfig,
+                id,
+                sourceTask,
+                classLoader,
+                taskConfig,
+                positionManagementService,
+                null,
+                null,
+                workerState,
+                connectStatsManager,
+                connectStatsService,
+                transformChain,
+                retryWithToleranceOperator
+        );
         this.sourceTask = sourceTask;
         this.sinkTask = sinkTask;
-        this.taskConfig = taskConfig;
-        this.positionManagementService = positionManagementService;
-        this.positionStorageReader = new PositionStorageReaderImpl(connectorName, positionManagementService);
-        this.positionStorageWriter = new PositionStorageWriter(connectorName, positionManagementService);
-        this.state = new AtomicReference<>(WorkerTaskState.NEW);
-        this.workerState = workerState;
-    }
-
-    /**
-     * Start a source task, and send data entry to MQ cyclically.
-     */
-    @Override
-    public void run() {
-        try {
-            starkSinkTask();
-            startSourceTask();
-            log.info("Direct task start, config:{}", JSON.toJSONString(taskConfig));
-            while (WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get()) {
-                try {
-                    Collection<ConnectRecord> toSendEntries = sourceTask.poll();
-                    if (null != toSendEntries && toSendEntries.size() > 0) {
-                        sendRecord(toSendEntries);
-                    }
-                } catch (Exception e) {
-                    log.error("Direct task runtime exception", e);
-                    state.set(WorkerTaskState.ERROR);
-                }
-            }
-            stopSourceTask();
-            stopSinkTask();
-            state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
-            log.info("Direct task stop, config:{}", JSON.toJSONString(taskConfig));
-        } catch (Exception e) {
-            log.error("Run task failed.", e);
-            state.set(WorkerTaskState.ERROR);
-        }
+        this.positionStorageReader = new PositionStorageReaderImpl(id.connector(), positionManagementService);
     }
 
     private void sendRecord(Collection<ConnectRecord> sourceDataEntries) {
-        List<ConnectRecord> sinkDataEntries = new ArrayList<>(sourceDataEntries.size());
-        Map<RecordPartition, RecordOffset> map = new HashMap<>();
-        for (ConnectRecord sourceDataEntry : sourceDataEntries) {
-            sinkDataEntries.add(sourceDataEntry);
-            RecordPartition recordPartition = sourceDataEntry.getPosition().getPartition();
-            RecordOffset recordOffset = sourceDataEntry.getPosition().getOffset();
-            if (null != recordPartition && null != recordOffset) {
-                map.put(recordPartition, recordOffset);
+        List<ConnectRecord> records = new ArrayList<>(sourceDataEntries.size());
+        List<RecordOffsetManagement.SubmittedPosition> positions = new ArrayList<>();
+        for (ConnectRecord preTransformRecord : sourceDataEntries) {
+
+            retryWithToleranceOperator.sourceRecord(preTransformRecord);
+            ConnectRecord record = transformChain.doTransforms(preTransformRecord);
+            if (record == null) {
+                continue;
             }
+
+            records.add(record);
+            /**prepare to send record*/
+            positions.add(prepareToSendRecord(preTransformRecord).get());
+
         }
         try {
-            sinkTask.put(sinkDataEntries);
-            try {
-                if (!MapUtils.isEmpty(map)) {
-                    map.forEach(positionStorageWriter::putPosition);
-                }
-            } catch (Exception e) {
-                log.error("Source task save position info failed.", e);
-            }
+            sinkTask.put(records);
+            // ack
+            positions.forEach(submittedPosition -> {
+                submittedPosition.ack();
+            });
         } catch (Exception e) {
+            // drop commit
+            positions.forEach(submittedPosition -> {
+                submittedPosition.remove();
+            });
             log.error("Send message error, error info: {}.", e);
         }
     }
 
     private void starkSinkTask() {
-        sinkTask.init(new SinkTaskContext() {
-
-            @Override
-            public String getConnectorName() {
-                return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
-            }
-
-            @Override
-            public String getTaskName() {
-                return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
-            }
-          
-            /**
-             * Get the configurations of current task.
-             *
-             * @return the configuration of current task.
-             */
-            @Override
-            public KeyValue configs() {
-                return taskConfig;
-            }
-            @Override
-            public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
-
-            }
-            @Override
-            public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
-
-            }
-
-            @Override
-            public void pause(List<RecordPartition> partitions) {
-
-            }
-
-            @Override
-            public void resume(List<RecordPartition> partitions) {
-
-            }
-            @Override
-            public Set<RecordPartition> assignment() {
-                return null;
-            }
-        });
+        sinkTask.init(new DirectSinkTaskContext());
         sinkTask.start(taskConfig);
         log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig));
     }
@@ -211,32 +138,8 @@ public class WorkerDirectTask implements WorkerTask {
     }
 
     private void startSourceTask() {
-        state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
-        sourceTask.init(new SourceTaskContext() {
-            @Override public OffsetStorageReader offsetStorageReader() {
-                return positionStorageReader;
-            }
-
-            @Override public String getConnectorName() {
-                return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
-            }
-
-            @Override public String getTaskName() {
-                return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
-            }
-
-            /**
-             * Get the configurations of current task.
-             *
-             * @return the configuration of current task.
-             */
-            @Override
-            public KeyValue configs() {
-                return taskConfig;
-            }
-        });
+        sourceTask.init(new DirectSourceTaskContext());
         sourceTask.start(taskConfig);
-        state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
         log.info("Source task start, config:{}", JSON.toJSONString(taskConfig));
     }
 
@@ -245,46 +148,175 @@ public class WorkerDirectTask implements WorkerTask {
         log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig));
     }
 
+    /**
+     * initinalize and start
+     */
     @Override
-    public WorkerTaskState getState() {
-        return this.state.get();
-    }
-
-    @Override
-    public void stop() {
-        state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
+    protected void initializeAndStart() {
+        starkSinkTask();
+        startSourceTask();
+        log.info("Direct task start, config:{}", JSON.toJSONString(taskConfig));
     }
 
+    /**
+     * execute poll and send record
+     */
     @Override
-    public void cleanup() {
-        if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED) ||
-            state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED)) {
-        } else {
-            log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state");
+    protected void execute() {
+        while (isRunning()) {
+            updateCommittableOffsets();
+            try {
+                Collection<ConnectRecord> toSendEntries = sourceTask.poll();
+                if (toSendEntries.isEmpty()) {
+                    sendRecord(toSendEntries);
+                }
+            } catch (Exception e) {
+                log.error("Direct task runtime exception", e);
+                finalOffsetCommit(true);
+                onFailure(e);
+            }
         }
     }
 
+    /**
+     * close resources
+     */
     @Override
-    public String getConnectorName() {
-        return connectorName;
+    public void close() {
+        stopSourceTask();
+        stopSinkTask();
     }
 
-    @Override
-    public ConnectKeyValue getTaskConfig() {
-        return taskConfig;
-    }
 
-    @Override
-    public Object getJsonObject() {
-        HashMap obj = new HashMap<String, Object>();
-        obj.put("connectorName", connectorName);
-        obj.put("configs", JSON.toJSONString(taskConfig));
-        obj.put("state", state.get().toString());
-        return obj;
+    /**
+     * direct sink task context
+     */
+    private class DirectSinkTaskContext implements SinkTaskContext {
+
+        /**
+         * Get the Connector Name
+         *
+         * @return connector name
+         */
+        @Override
+        public String getConnectorName() {
+            return id().connector();
+        }
+
+        /**
+         * Get the Task Name of connector.
+         *
+         * @return task name
+         */
+        @Override
+        public String getTaskName() {
+            return id().task() + "";
+        }
+
+        /**
+         * Get the configurations of current task.
+         *
+         * @return the configuration of current task.
+         */
+        @Override
+        public KeyValue configs() {
+            return taskConfig;
+        }
+
+        /**
+         * Reset the consumer offset for the given queue.
+         *
+         * @param recordPartition the partition to reset offset.
+         * @param recordOffset    the offset to reset to.
+         */
+        @Override
+        public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+            // no-op
+        }
+
+        /**
+         * Reset the offsets for the given partition.
+         *
+         * @param offsets the map of offsets for targetPartition.
+         */
+        @Override
+        public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
+            // no-op
+        }
+
+        /**
+         * Pause consumption of messages from the specified partition.
+         *
+         * @param partitions the partition list to be reset offset.
+         */
+        @Override
+        public void pause(List<RecordPartition> partitions) {
+            // no-op
+        }
+
+        /**
+         * Resume consumption of messages from previously paused Partition.
+         *
+         * @param partitions the partition list to be resume.
+         */
+        @Override
+        public void resume(List<RecordPartition> partitions) {
+            // no-op
+        }
+
+        /**
+         * Current task assignment processing partition
+         *
+         * @return the partition list
+         */
+        @Override
+        public Set<RecordPartition> assignment() {
+            return null;
+        }
     }
 
-    @Override
-    public void timeout() {
-        this.state.set(WorkerTaskState.ERROR);
+
+    private class DirectSourceTaskContext implements SourceTaskContext {
+
+        /**
+         * Get the OffsetStorageReader for this SourceTask.
+         *
+         * @return offset storage reader
+         */
+        @Override
+        public OffsetStorageReader offsetStorageReader() {
+            return positionStorageReader;
+        }
+
+        /**
+         * Get the Connector Name
+         *
+         * @return connector name
+         */
+        @Override
+        public String getConnectorName() {
+            return id().connector();
+        }
+
+        /**
+         * Get the Task Id of connector.
+         *
+         * @return task name
+         */
+        @Override
+        public String getTaskName() {
+            return id().task() + "";
+        }
+
+        /**
+         * Get the configurations of current task.
+         *
+         * @return the configuration of current task.
+         */
+        @Override
+        public KeyValue configs() {
+            return taskConfig;
+        }
     }
+
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 6081fdd..70fb7bc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -28,20 +28,6 @@ import io.openmessaging.connector.api.data.SchemaAndValue;
 import io.openmessaging.connector.api.errors.ConnectException;
 import io.openmessaging.connector.api.errors.RetriableException;
 import io.openmessaging.internal.DefaultKeyValue;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -56,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.common.QueueState;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
 import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
@@ -64,53 +51,37 @@ import org.apache.rocketmq.connect.runtime.errors.WorkerErrorRecordReporter;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * A wrapper of {@link SinkTask} for runtime.
  */
-public class WorkerSinkTask implements WorkerTask {
+public class WorkerSinkTask extends WorkerTask {
 
     private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
-    /**
-     * The configuration key that provides the list of topicNames that are inputs for this SinkTask.
-     */
-    public static final String QUEUENAMES_CONFIG = "topicNames";
-
-    /**
-     * The configuration key that provide the list of topicQueues that are inputs for this SinkTask; The config value
-     * format is topicName1,brokerName1,queueId1;topicName2,brokerName2,queueId2, use topicName1, brokerName1, queueId1
-     * can construct {@link MessageQueue}
-     */
-    public static final String TOPIC_QUEUES_CONFIG = "topicQueues";
-
-    /**
-     * Connector name of current task.
-     */
-    private String connectorName;
-
     /**
      * The implements of the sink task.
      */
     private SinkTask sinkTask;
 
-    /**
-     * The configs of current sink task.
-     */
-    private ConnectKeyValue taskConfig;
-
-    /**
-     * Atomic state variable
-     */
-    private AtomicReference<WorkerTaskState> state;
-
-    /**
-     * Stop retry limit
-     */
-
     /**
      * A RocketMQ consumer to pull message from MQ.
      */
@@ -144,23 +115,17 @@ public class WorkerSinkTask implements WorkerTask {
 
     private static final long PULL_MSG_ERROR_BACKOFF_MS = 1000 * 10;
     private static final long PULL_NO_MSG_BACKOFF_MS = 1000 * 3;
-
     private static final long PULL_MSG_ERROR_THRESHOLD = 16;
 
-    private final AtomicReference<WorkerState> workerState;
-
+    /**
+     * stat
+     */
     private final ConnectStatsManager connectStatsManager;
-
     private final ConnectStatsService connectStatsService;
 
     private final CountDownLatch stopPullMsgLatch;
-
     private WorkerSinkTaskContext sinkTaskContext;
-
-    private final TransformChain<ConnectRecord> transformChain;
-
     private WorkerErrorRecordReporter errorRecordReporter;
-    private RetryWithToleranceOperator retryWithToleranceOperator;
 
 
     public static final String BROKER_NAME = "brokerName";
@@ -180,91 +145,31 @@ public class WorkerSinkTask implements WorkerTask {
         }
     };
 
-    public WorkerSinkTask(String connectorName,
-        SinkTask sinkTask,
-        ConnectKeyValue taskConfig,
-        RecordConverter recordConverter,
-        DefaultMQPullConsumer consumer,
-        AtomicReference<WorkerState> workerState,
-        ConnectStatsManager connectStatsManager,
-        ConnectStatsService connectStatsService,
-        TransformChain<ConnectRecord> transformChain,
-        RetryWithToleranceOperator retryWithToleranceOperator,
-        WorkerErrorRecordReporter errorRecordReporter) {
-        this.connectorName = connectorName;
+    public WorkerSinkTask(ConnectConfig workerConfig,
+                          ConnectorTaskId id,
+                          SinkTask sinkTask,
+                          ClassLoader classLoader,
+                          ConnectKeyValue taskConfig,
+                          RecordConverter recordConverter,
+                          DefaultMQPullConsumer consumer,
+                          AtomicReference<WorkerState> workerState,
+                          ConnectStatsManager connectStatsManager,
+                          ConnectStatsService connectStatsService,
+                          TransformChain<ConnectRecord> transformChain,
+                          RetryWithToleranceOperator retryWithToleranceOperator,
+                          WorkerErrorRecordReporter errorRecordReporter) {
+        super(workerConfig, id, classLoader, taskConfig, retryWithToleranceOperator, transformChain, workerState);
         this.sinkTask = sinkTask;
-        this.taskConfig = taskConfig;
         this.consumer = consumer;
         this.recordConverter = recordConverter;
         this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
         this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
-        this.state = new AtomicReference<>(WorkerTaskState.NEW);
-        this.workerState = workerState;
         this.connectStatsManager = connectStatsManager;
         this.connectStatsService = connectStatsService;
         this.stopPullMsgLatch = new CountDownLatch(1);
-        this.transformChain = transformChain;
+        this.sinkTaskContext = new WorkerSinkTaskContext(taskConfig, this, consumer);
         this.errorRecordReporter = errorRecordReporter;
-        this.retryWithToleranceOperator = retryWithToleranceOperator;
-        this.transformChain.retryWithToleranceOperator(retryWithToleranceOperator);
-    }
 
-    /**
-     * Start a sink task, and receive data entry from MQ cyclically.
-     */
-    @Override
-    public void run() {
-        try {
-            registTopics();
-            consumer.start();
-            log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig));
-            state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
-            this.sinkTaskContext = new WorkerSinkTaskContext(taskConfig, this, consumer);
-            sinkTask.init(sinkTaskContext);
-            sinkTask.start(taskConfig);
-            // we assume executed here means we are safe
-            log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig));
-            state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
-
-            while (WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get()) {
-                // this method can block up to 3 minutes long
-                try {
-                    preCommit(false);
-                    setQueueOffset();
-                    pullMessageFromQueues();
-                } catch (RetriableException e) {
-                    connectStatsManager.incSinkRecordPutTotalFailNums();
-                    connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                    log.error("Sink task RetriableException exception", e);
-                } catch (InterruptedException e) {
-                    connectStatsManager.incSinkRecordPutTotalFailNums();
-                    connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                    log.error("Sink task InterruptedException exception", e);
-                    throw e;
-                } catch (Throwable e) {
-                    state.set(WorkerTaskState.ERROR);
-                    log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
-                    connectStatsManager.incSinkRecordPutTotalFailNums();
-                    connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                } finally {
-                    // record sink read times
-                    connectStatsManager.incSinkRecordReadTotalTimes();
-                }
-            }
-
-            sinkTask.stop();
-            state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
-            log.info("Sink task stop, config:{}", JSON.toJSONString(taskConfig));
-
-        } catch (Exception e) {
-            log.error("Run task failed.", e);
-            state.set(WorkerTaskState.ERROR);
-        } finally {
-            if (consumer != null) {
-                consumer.shutdown();
-                log.info("Sink task consumer shutdown. config:{}", JSON.toJSONString(taskConfig));
-            }
-        }
     }
 
     private void setQueueOffset() {
@@ -285,10 +190,13 @@ public class WorkerSinkTask implements WorkerTask {
         this.sinkTaskContext.cleanQueuesOffsets();
     }
 
-    private void registTopics() {
+    /**
+     * sub topics
+     */
+    private void registryTopics() {
         Set<String> topics = SinkConnectorConfig.parseTopicList(taskConfig);
         if (org.apache.commons.collections4.CollectionUtils.isEmpty(topics)) {
-            throw new ConnectException("sink connector topics config can be null, please check sink connector config info");
+            throw new ConnectException("Sink connector topics config can be null, please check sink connector config info");
         }
         for (String topic : topics) {
             consumer.registerMessageQueueListener(topic, new MessageQueueListener() {
@@ -301,7 +209,6 @@ public class WorkerSinkTask implements WorkerTask {
                             messageQueuesOffsetMap.remove(key, value);
                         }
                     });
-
                     Set<RecordPartition> waitRemoveQueueMetaDatas = new HashSet<>();
                     recordPartitions.forEach(key -> {
                         if (key.getPartition().get("topic").equals(topic)) {
@@ -359,8 +266,11 @@ public class WorkerSinkTask implements WorkerTask {
     }
 
     public void incPullTPS(String topic, int pullSize) {
-        consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
-            .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
+        consumer.getDefaultMQPullConsumerImpl()
+                .getRebalanceImpl()
+                .getmQClientFactory()
+                .getConsumerStatsManager()
+                .incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
     }
 
     private void pullMessageFromQueues() throws InterruptedException {
@@ -372,13 +282,12 @@ public class WorkerSinkTask implements WorkerTask {
         }
         for (Map.Entry<MessageQueue, Long> entry : messageQueuesOffsetMap.entrySet()) {
             if (messageQueuesStateMap.containsKey(entry.getKey())) {
-                log.warn("sink task message queue state is not running, sink task id {}, queue info {}, queue state {}", taskConfig.getString(RuntimeConfigDefine.TASK_ID), JSON.toJSONString(entry.getKey()), JSON.toJSONString(messageQueuesStateMap.get(entry.getKey())));
+                log.warn("sink task message queue state is not running, sink task id {}, queue info {}, queue state {}", id().toString(), JSON.toJSONString(entry.getKey()), JSON.toJSONString(messageQueuesStateMap.get(entry.getKey())));
                 continue;
             }
             log.info("START pullBlockIfNotFound, time started : {}", System.currentTimeMillis());
-
-            if (WorkerTaskState.RUNNING != state.get()) {
-                log.warn("sink task state is not running, sink task id {}, state {}", taskConfig.getString(RuntimeConfigDefine.TASK_ID), state.get().name());
+            if (isStopping()) {
+                log.warn("sink task state is not running, sink task id {}, state {}", id().toString(), state.get().name());
                 break;
             }
             PullResult pullResult = null;
@@ -386,95 +295,75 @@ public class WorkerSinkTask implements WorkerTask {
             try {
                 shouldStopPullMsg();
                 pullResult = consumer.pull(entry.getKey(), "*", entry.getValue(), MAX_MESSAGE_NUM);
+                if (pullResult == null) {
+                    continue;
+                }
                 pullMsgErrorCount = 0;
-            } catch (MQClientException e) {
+            } catch (MQClientException | RemotingException | MQBrokerException e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
-                connectStatsManager.incSinkRecordReadTotalFailNums();
-                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
-                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
-                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
-            } catch (RemotingException e) {
-                pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
-                connectStatsManager.incSinkRecordReadTotalFailNums();
-                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
-                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
-                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
-            } catch (MQBrokerException e) {
-                pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
-                connectStatsManager.incSinkRecordReadTotalFailNums();
-                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
-                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
-                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message {}, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getClass().getName(), e.getMessage(), this.state.get(), e);
+                readRecordFail(beginPullMsgTimestamp);
             } catch (InterruptedException e) {
                 pullMsgErrorCount++;
                 log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
-                connectStatsManager.incSinkRecordReadTotalFailNums();
-                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
-                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
-                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
+                readRecordFail(beginPullMsgTimestamp);
                 throw e;
             } catch (Throwable e) {
                 pullMsgErrorCount++;
                 log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
-                connectStatsManager.incSinkRecordReadTotalFailNums();
-                connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
-                connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
-                connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
+                readRecordFail(beginPullMsgTimestamp);
                 throw e;
             }
-            long currentTime = System.currentTimeMillis();
 
             List<MessageExt> messages = null;
-            log.info("INSIDE pullMessageFromQueues, time elapsed : {}", currentTime - startTimeStamp);
-            if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.FOUND)) {
-                pullNotFountMsgCount = 0;
-                this.incPullTPS(entry.getKey().getTopic(), pullResult.getMsgFoundList().size());
-                messages = pullResult.getMsgFoundList();
-                connectStatsManager.incSinkRecordReadTotalNums(messages.size());
-                connectStatsManager.incSinkRecordReadNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID), messages.size());
-                long pullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
-                connectStatsManager.incSinkRecordReadTotalRT(pullRT);
-                connectStatsManager.incSinkRecordReadRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), pullRT);
-                receiveMessages(messages);
-                if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
-                    messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
-                } else {
-                    log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
-                }
-                try {
-                    consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset());
-                } catch (MQClientException e) {
-                    log.warn("updateConsumeOffset MQClientException, pullResult {}", pullResult, e);
-                }
-            } else if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.OFFSET_ILLEGAL)) {
-                log.warn("offset illegal, reset offset, message queue {}, pull offset {}, nextBeginOffset {}", JSON.toJSONString(entry.getKey()), entry.getValue(), pullResult.getNextBeginOffset());
-                this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
-            } else if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.NO_NEW_MSG)) {
-                pullNotFountMsgCount++;
-                log.info("no new message, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
-            } else if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.NO_MATCHED_MSG)) {
-                log.info("no matched msg, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
-                this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
-            } else {
-                pullNotFountMsgCount++;
-                log.info("unknow pull msg state, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+            log.info("INSIDE pullMessageFromQueues, time elapsed : {}", System.currentTimeMillis() - startTimeStamp);
+            PullStatus status = pullResult.getPullStatus();
+            switch (status) {
+                case FOUND:
+                    pullNotFountMsgCount = 0;
+                    this.incPullTPS(entry.getKey().getTopic(), pullResult.getMsgFoundList().size());
+                    messages = pullResult.getMsgFoundList();
+                    recordReadSuccess(messages.size(), beginPullMsgTimestamp);
+                    receiveMessages(messages);
+                    if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
+                        // put offset
+                        messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
+                    } else {
+                        // load balancing
+                        log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
+                    }
+                    try {
+                        consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset());
+                    } catch (MQClientException e) {
+                        log.warn("updateConsumeOffset MQClientException, pullResult {}", pullResult, e);
+                    }
+                    break;
+                case OFFSET_ILLEGAL:
+                    log.warn("Offset illegal, reset offset, message queue {}, pull offset {}, nextBeginOffset {}", JSON.toJSONString(entry.getKey()), entry.getValue(), pullResult.getNextBeginOffset());
+                    this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
+                    break;
+                case NO_NEW_MSG:
+                    pullNotFountMsgCount++;
+                    log.info("No new message, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+                    break;
+                case NO_MATCHED_MSG:
+                    log.info("no matched msg, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+                    this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
+                    break;
+                default:
+                    pullNotFountMsgCount++;
+                    log.info("unknow pull msg state, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+                    break;
             }
 
-            AtomicLong atomicLong = connectStatsService.singleSinkTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+            AtomicLong atomicLong = connectStatsService.singleSinkTaskTimesTotal(id().toString());
             if (null != atomicLong) {
                 atomicLong.addAndGet(org.apache.commons.collections4.CollectionUtils.isEmpty(messages) ? 0 : messages.size());
             }
         }
     }
 
+
     private void shouldStopPullMsg() throws InterruptedException {
         if (pullMsgErrorCount == PULL_MSG_ERROR_THRESHOLD) {
             log.error("Accumulative error {} times, stop pull msg for {} ms", pullMsgErrorCount, PULL_MSG_ERROR_BACKOFF_MS);
@@ -508,37 +397,15 @@ public class WorkerSinkTask implements WorkerTask {
         }
     }
 
-    private void removePauseQueueMessage(MessageQueue messageQueue, List<MessageExt> messages) {
-        if (null != messageQueuesStateMap.get(messageQueue)) {
-            final Iterator<MessageExt> iterator = messages.iterator();
-            while (iterator.hasNext()) {
-                final MessageExt message = iterator.next();
-                String msgId = message.getMsgId();
-                log.info("BrokerName {}, topicName {}, queueId {} is pause, Discard the message {}", messageQueue.getBrokerName(), messageQueue.getTopic(), message.getQueueId(), msgId);
-                iterator.remove();
-            }
-        }
-    }
-
     @Override
-    public void stop() {
-        state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
-        try {
-            transformChain.close();
-        } catch (Exception exception) {
-            log.error("Transform close failed, {}", exception);
-        }
+    public void close() {
+        sinkTask.stop();
+        consumer.shutdown();
+        stopPullMsgLatch.countDown();
+        Utils.closeQuietly(transformChain, "transform chain");
+        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
     }
 
-    @Override
-    public void cleanup() {
-        if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED) ||
-            state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED))
-            consumer.shutdown();
-        else {
-            log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state");
-        }
-    }
 
     /**
      * receive message from MQ.
@@ -546,28 +413,17 @@ public class WorkerSinkTask implements WorkerTask {
      * @param messages
      */
     private void receiveMessages(List<MessageExt> messages) {
-        List<ConnectRecord> sinkDataEntries = new ArrayList<>(32);
+        List<ConnectRecord> records = new ArrayList<>(32);
         for (MessageExt message : messages) {
             this.retryWithToleranceOperator.consumerRecord(message);
-            ConnectRecord sinkDataEntry = this.retryWithToleranceOperator.execute(() -> convertToSinkDataEntry(message), ErrorReporter.Stage.CONVERTER, WorkerSinkTask.class);
-            if (sinkDataEntry != null && !this.retryWithToleranceOperator.failed())
-            sinkDataEntries.add(sinkDataEntry);
-            String msgId = message.getMsgId();
-            log.info("Received one message success : msgId {}", msgId);
-        }
-        List<ConnectRecord> connectRecordList = new ArrayList<>(32);
-        for (ConnectRecord connectRecord : sinkDataEntries) {
-            ConnectRecord connectRecord1 = this.transformChain.doTransforms(connectRecord);
-            if (null != connectRecord1) {
-                connectRecordList.add(connectRecord1);
+            ConnectRecord connectRecord = convertMessages(message);
+            if (connectRecord != null && !this.retryWithToleranceOperator.failed()) {
+                records.add(connectRecord);
             }
-        }
-        if (CollectionUtils.isEmpty(connectRecordList)) {
-            log.info("after transforms connectRecordList is null");
-            return;
+            log.info("Received one message success : msgId {}", message.getMsgId());
         }
         try {
-            sinkTask.put(connectRecordList);
+            sinkTask.put(records);
             return;
         } catch (RetriableException e) {
             log.error("task {} put sink recode RetriableException", this, e.getMessage(), e);
@@ -579,15 +435,14 @@ public class WorkerSinkTask implements WorkerTask {
 
     }
 
-    private ConnectRecord convertToSinkDataEntry(MessageExt message) {
+    private ConnectRecord convertMessages(MessageExt message) {
         Map<String, String> properties = message.getProperties();
-        ConnectRecord sinkDataEntry;
-
+        ConnectRecord record;
         // start convert
         if (recordConverter == null) {
             final byte[] messageBody = message.getBody();
             String s = new String(messageBody);
-            sinkDataEntry = JSON.parseObject(s, ConnectRecord.class);
+            record = JSON.parseObject(s, ConnectRecord.class);
         } else {
             // timestamp
             String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
@@ -600,11 +455,21 @@ public class WorkerSinkTask implements WorkerTask {
             // convert
             SchemaAndValue schemaAndValue = retryWithToleranceOperator.execute(() -> recordConverter.toConnectData(message.getTopic(), message.getBody()),
                     ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
-            sinkDataEntry = new ConnectRecord(recordPartition, recordOffset, timestamp, schemaAndValue.schema(), schemaAndValue.value());
+            record = new ConnectRecord(recordPartition, recordOffset, timestamp, schemaAndValue.schema(), schemaAndValue.value());
+            if (retryWithToleranceOperator.failed()) {
+                return null;
+            }
         }
+
+        // Apply the transformations
+        ConnectRecord transformedRecord = transformChain.doTransforms(record);
+        if (transformedRecord == null) {
+            return null;
+        }
+
         // add extension
-        addExtension(properties, sinkDataEntry);
-        return sinkDataEntry;
+        addExtension(properties, record);
+        return record;
     }
 
     private void addExtension(Map<String, String> properties, ConnectRecord sinkDataEntry) {
@@ -623,46 +488,48 @@ public class WorkerSinkTask implements WorkerTask {
         sinkDataEntry.addExtension(keyValue);
     }
 
-    @Override
-    public String getConnectorName() {
-        return connectorName;
-    }
-
-    @Override
-    public WorkerTaskState getState() {
-        return state.get();
-    }
-
-    @Override
-    public ConnectKeyValue getTaskConfig() {
-        return taskConfig;
-    }
-
     /**
-     * Further we cant try to log what caused the error
+     * initinalize and start
      */
     @Override
-    public void timeout() {
-        this.state.set(WorkerTaskState.ERROR);
+    protected void initializeAndStart() {
+        registryTopics();
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+        }
+        log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig));
+        sinkTask.init(sinkTaskContext);
+        sinkTask.start(taskConfig);
     }
 
+    /**
+     * execute poll and send record
+     */
     @Override
-    public String toString() {
-
-        StringBuilder sb = new StringBuilder();
-        sb.append("connectorName:" + connectorName)
-            .append("\nConfigs:" + JSON.toJSONString(taskConfig))
-            .append("\nState:" + state.get().toString());
-        return sb.toString();
-    }
+    protected void execute() {
+        while (isRunning()) {
+            // this method can block up to 3 minutes long
+            try {
+                preCommit(false);
+                setQueueOffset();
+                pullMessageFromQueues();
+            } catch (RetriableException e) {
+                readRecordFailNum();
+                log.error("Sink task RetriableException exception", e);
+            } catch (InterruptedException e) {
+                readRecordFailNum();
+                log.error("Sink task InterruptedException exception", e);
+            } catch (Throwable e) {
+                log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
+                readRecordFailNum();
+                throw e;
+            } finally {
+                // record sink read times
+                connectStatsManager.incSinkRecordReadTotalTimes();
+            }
+        }
 
-    @Override
-    public Object getJsonObject() {
-        HashMap obj = new HashMap<String, Object>();
-        obj.put("connectorName", connectorName);
-        obj.put("configs", JSON.toJSONString(taskConfig));
-        obj.put("state", state.get().toString());
-        return obj;
     }
 
     public Set<RecordPartition> getRecordPartitions() {
@@ -688,13 +555,60 @@ public class WorkerSinkTask implements WorkerTask {
         this.sinkTaskContext.resetOffset(offsets);
     }
 
+
+    private void removePauseQueueMessage(MessageQueue messageQueue, List<MessageExt> messages) {
+        if (null != messageQueuesStateMap.get(messageQueue)) {
+            final Iterator<MessageExt> iterator = messages.iterator();
+            while (iterator.hasNext()) {
+                final MessageExt message = iterator.next();
+                String msgId = message.getMsgId();
+                log.info("BrokerName {}, topicName {}, queueId {} is pause, Discard the message {}", messageQueue.getBrokerName(), messageQueue.getTopic(), message.getQueueId(), msgId);
+                iterator.remove();
+            }
+        }
+    }
+
     /**
      * error record reporter
+     *
      * @return
      */
     public WorkerErrorRecordReporter errorRecordReporter() {
         return errorRecordReporter;
     }
+
+    private void recordReadSuccess(int recordSize, long beginPullMsgTimestamp) {
+        long pullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
+        recordReadNums(recordSize);
+        recordReadRT(pullRT);
+    }
+
+    private void recordReadNums(int size) {
+        connectStatsManager.incSinkRecordReadTotalNums(size);
+        connectStatsManager.incSinkRecordReadNums(id().toString(), size);
+    }
+
+    private void recordReadRT(long pullRT) {
+        connectStatsManager.incSinkRecordReadTotalRT(pullRT);
+        connectStatsManager.incSinkRecordReadRT(id().toString(), pullRT);
+    }
+
+
+    private void readRecordFail(long beginPullMsgTimestamp) {
+        readRecordFailNum();
+        readRecordFailRT(System.currentTimeMillis() - beginPullMsgTimestamp);
+    }
+
+    private void readRecordFailRT(long errorPullRT) {
+        connectStatsManager.incSinkRecordReadTotalFailRT(errorPullRT);
+        connectStatsManager.incSinkRecordReadFailRT(id().toString(), errorPullRT);
+    }
+
+    private void readRecordFailNum() {
+        connectStatsManager.incSinkRecordReadTotalFailNums();
+        connectStatsManager.incSinkRecordReadFailNums(id().toString());
+    }
+
 }
 
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
index e698d1d..6a774ad 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
@@ -150,7 +150,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
             }
             messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
         }
-        this.workerSinkTask.stop();
+        this.workerSinkTask.close();
     }
 
     @Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 6711dc5..746c871 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -22,10 +22,8 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
-import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordConverter;
-import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.RecordPosition;
 import io.openmessaging.connector.api.errors.ConnectException;
@@ -36,30 +34,40 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
 import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -68,39 +76,26 @@ import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTas
 /**
  * A wrapper of {@link SourceTask} for runtime.
  */
-public class WorkerSourceTask implements WorkerTask {
+public class WorkerSourceTask extends WorkerTask {
 
     private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
-    /**
-     * Connector name of current task.
-     */
-    private String connectorName;
-
+    private static final long SEND_FAILED_BACKOFF_MS = 100;
     /**
      * The implements of the source task.
      */
-    private SourceTask sourceTask;
-
-    /**
-     * The configs of current source task.
-     */
-    private ConnectKeyValue taskConfig;
+    private final SourceTask sourceTask;
 
-    /**
-     * Atomic state variable
-     */
-    private AtomicReference<WorkerTaskState> state;
+    protected final WorkerSourceTaskContext sourceTaskContext;
 
     /**
      * Used to read the position of source data source.
      */
-    private OffsetStorageReader offsetStorageReader;
+    private final OffsetStorageReader offsetStorageReader;
 
     /**
      * Used to write the position of source data source.
      */
-    private PositionStorageWriter positionStorageWriter;
+    private final PositionStorageWriter positionStorageWriter;
 
     /**
      * A RocketMQ producer to send message to dest MQ.
@@ -110,19 +105,21 @@ public class WorkerSourceTask implements WorkerTask {
     /**
      * A converter to parse source data entry to byte[].
      */
-    private RecordConverter recordConverter;
-
-    private final AtomicReference<WorkerState> workerState;
+    private final RecordConverter recordConverter;
 
-    private ConnectStatsManager connectStatsManager;
-
-    private ConnectStatsService connectStatsService;
+    /**
+     * stat connect
+     */
+    private final ConnectStatsManager connectStatsManager;
+    private final ConnectStatsService connectStatsService;
 
+    private final CountDownLatch stopRequestedLatch;
+    private final AtomicReference<Throwable> producerSendException;
     private List<ConnectRecord> toSendRecord;
 
-    private TransformChain<ConnectRecord> transformChain;
 
-    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private volatile RecordOffsetManagement.CommittableOffsets committableOffsets;
+    private final RecordOffsetManagement offsetManagement;
     /**
      * The property of message in WHITE_KEY_SET don't need add a connect prefix
      */
@@ -133,8 +130,10 @@ public class WorkerSourceTask implements WorkerTask {
         WHITE_KEY_SET.add(MessageConst.PROPERTY_TAGS);
     }
 
-    public WorkerSourceTask(String connectorName,
+    public WorkerSourceTask(ConnectConfig workerConfig,
+                            ConnectorTaskId id,
                             SourceTask sourceTask,
+                            ClassLoader classLoader,
                             ConnectKeyValue taskConfig,
                             PositionManagementService positionManagementService,
                             RecordConverter recordConverter,
@@ -144,248 +143,277 @@ public class WorkerSourceTask implements WorkerTask {
                             ConnectStatsService connectStatsService,
                             TransformChain<ConnectRecord> transformChain,
                             RetryWithToleranceOperator retryWithToleranceOperator) {
-        this.connectorName = connectorName;
+        super(workerConfig, id, classLoader, taskConfig, retryWithToleranceOperator, transformChain, workerState);
+
         this.sourceTask = sourceTask;
-        this.taskConfig = taskConfig;
-        this.offsetStorageReader = new PositionStorageReaderImpl(connectorName, positionManagementService);
-        this.positionStorageWriter = new PositionStorageWriter(connectorName, positionManagementService);
+        this.offsetStorageReader = new PositionStorageReaderImpl(id.connector(), positionManagementService);
+        this.positionStorageWriter = new PositionStorageWriter(id.connector(), positionManagementService);
         this.producer = producer;
         this.recordConverter = recordConverter;
-        this.state = new AtomicReference<>(WorkerTaskState.NEW);
-        this.workerState = workerState;
         this.connectStatsManager = connectStatsManager;
         this.connectStatsService = connectStatsService;
-        this.transformChain = transformChain;
-        this.retryWithToleranceOperator = retryWithToleranceOperator;
-        this.transformChain.retryWithToleranceOperator(this.retryWithToleranceOperator);
-    }
-
-    /**
-     * Start a source task, and send data entry to MQ cyclically.
-     */
-    @Override
-    public void run() {
-        try {
-            producer.start();
-            log.info("Source task producer start.");
-            state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
-            sourceTask.init(new SourceTaskContext() {
-
-                @Override
-                public OffsetStorageReader offsetStorageReader() {
-                    return offsetStorageReader;
-                }
-
-                @Override
-                public String getConnectorName() {
-                    return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
-                }
-
-                @Override
-                public String getTaskName() {
-                    return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
-                }
-
-                /**
-                 * Get the configurations of current task.
-                 *
-                 * @return the configuration of current task.
-                 */
-                @Override
-                public KeyValue configs() {
-                    return taskConfig;
-                }
-            });
-            sourceTask.start(taskConfig);
-            state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
-            log.info("Source task start, config:{}", JSON.toJSONString(taskConfig));
-            while (WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get()) {
-                if (CollectionUtils.isEmpty(toSendRecord)) {
-                    try {
-                        toSendRecord = poll();
-                        if (null != toSendRecord && toSendRecord.size() > 0) {
-                            connectStatsManager.incSourceRecordPollTotalNums(toSendRecord.size());
-                            connectStatsManager.incSourceRecordPollNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID), toSendRecord.size());
-                            sendRecord();
-                        }
-                    } catch (RetriableException e) {
-                        connectStatsManager.incSourceRecordPollTotalFailNums();
-                        connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                        log.error("Source task RetriableException exception", e);
-                    } catch (Exception e) {
-                        connectStatsManager.incSourceRecordPollTotalFailNums();
-                        connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                        log.error("Source task Exception exception", e);
-                        state.set(WorkerTaskState.ERROR);
-                    } finally {
-                        // record source poll times
-                        connectStatsManager.incSourceRecordPollTotalTimes();
-                    }
-                }
-                AtomicLong atomicLong = connectStatsService.singleSourceTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                if (null != atomicLong) {
-                    atomicLong.addAndGet(toSendRecord == null ? 0 : toSendRecord.size());
-                }
-            }
-            sourceTask.stop();
-            state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
-            log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig));
-        } catch (Exception e) {
-            log.error("Run task failed., task config: " + JSON.toJSONString(taskConfig), e);
-            state.set(WorkerTaskState.ERROR);
-        } finally {
-            if (producer != null) {
-                producer.shutdown();
-                log.info("Source task producer shutdown. task config {}", JSON.toJSONString(taskConfig));
-            }
-        }
+        this.sourceTaskContext = new WorkerSourceTaskContext(offsetStorageReader, this, taskConfig);
+        this.stopRequestedLatch = new CountDownLatch(1);
+        this.producerSendException = new AtomicReference<>();
+        this.offsetManagement = new RecordOffsetManagement();
+        this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
     }
 
     private List<ConnectRecord> poll() throws InterruptedException {
-        List<ConnectRecord> connectRecordList = null;
         try {
-            connectRecordList = sourceTask.poll();
-            if (CollectionUtils.isEmpty(connectRecordList)) {
+            List<ConnectRecord> connectRecords = sourceTask.poll();
+            if (CollectionUtils.isEmpty(connectRecords)) {
                 return null;
             }
-            List<ConnectRecord> connectRecordList1 = new ArrayList<>(32);
-            for (ConnectRecord connectRecord : connectRecordList) {
-
-                retryWithToleranceOperator.sourceRecord(connectRecord);
-
-                ConnectRecord connectRecord1 = this.transformChain.doTransforms(connectRecord);
-                if (null != connectRecord1 && !retryWithToleranceOperator.failed()) {
-                    connectRecordList1.add(connectRecord1);
-                }
-            }
-            return connectRecordList1;
+            return connectRecords;
         } catch (RetriableException e) {
             log.error("Source task RetriableException exception, taskconfig {}", JSON.toJSONString(taskConfig), e);
             return null;
         }
     }
 
+
     @Override
-    public void stop() {
-        state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
-        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+    public void close() {
+        producer.shutdown();
+        stopRequestedLatch.countDown();
         Utils.closeQuietly(transformChain, "transform chain");
-        log.warn("Stop a task success.");
+        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+        Utils.closeQuietly(positionStorageWriter, "position storage writer");
     }
 
-    @Override
-    public void cleanup() {
-        log.info("Cleaning a task, current state {}, destination state {}", state.get().name(), WorkerTaskState.TERMINATED.name());
-        if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED) ||
-                state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED)) {
-            log.info("Cleaning a task success");
-        } else {
-            log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state");
+    protected void updateCommittableOffsets() {
+        RecordOffsetManagement.CommittableOffsets newOffsets = offsetManagement.committableOffsets();
+        synchronized (this) {
+            this.committableOffsets = this.committableOffsets.updatedWith(newOffsets);
         }
     }
 
+
+    protected Optional<RecordOffsetManagement.SubmittedPosition> prepareToSendRecord(
+            ConnectRecord record
+    ) {
+        maybeThrowProducerSendException();
+        return Optional.of(this.offsetManagement.submitRecord(record.getPosition()));
+    }
+
+
     /**
      * Send list of sourceDataEntries to MQ.
      */
-    private void sendRecord() throws InterruptedException, RemotingException, MQClientException {
-        for (ConnectRecord sourceDataEntry : toSendRecord) {
-            RecordPosition position = sourceDataEntry.getPosition();
-            RecordOffset offset = position.getOffset();
-
-            Message sourceMessage = new Message();
-            String topic = null;
-            topic = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
-            if (StringUtils.isBlank(topic)) {
-                RecordPosition recordPosition = sourceDataEntry.getPosition();
-                if (null == recordPosition) {
-                    log.error("connect-topicname config is null and recordPosition is null , lack of topic config");
-                    return;
-                }
-                RecordPartition partition = recordPosition.getPartition();
-                if (null == partition) {
-                    log.error("connect-topicname config is null and partition is null , lack of topic config");
-                    return;
-                }
-                Map<String, ?> partitionMap = partition.getPartition();
-                if (null == partitionMap) {
-                    log.error("connect-topicname config is null and partitionMap is null , lack of topic config");
-                    return;
-                }
-                Object o = partitionMap.get(TOPIC);
-                if (null == o) {
-                    log.error("connect-topicname config is null and partitionMap.get is null , lack of topic config");
-                    return;
-                }
-                topic = (String) o;
-            }
-            if (StringUtils.isBlank(topic)) {
-                throw new ConnectException("source connect lack of topic config");
-            }
-            sourceMessage.setTopic(topic);
-            // converter
-            if (recordConverter == null) {
-                final byte[] messageBody = JSON.toJSONString(sourceDataEntry, SerializerFeature.DisableCircularReferenceDetect,  SerializerFeature.WriteMapNullValue).getBytes();
-                if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
-                    log.error("Send record, message size is greater than {} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(sourceDataEntry));
-                    continue;
-                }
-                sourceMessage.setBody(messageBody);
-            } else {
-                String finalTopic = topic;
-                byte[] messageBody = retryWithToleranceOperator.execute(() -> recordConverter.fromConnectData(finalTopic, sourceDataEntry.getSchema(), sourceDataEntry.getData()),
-                        ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
-                if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
-                    log.error("Send record, message size is greater than {} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(sourceDataEntry));
-                    continue;
-                }
-                sourceMessage.setBody(messageBody);
+    private Boolean sendRecord() throws InterruptedException {
+        int processed = 0;
+        for (ConnectRecord preTransformRecord : toSendRecord) {
+            retryWithToleranceOperator.sourceRecord(preTransformRecord);
+            ConnectRecord record = transformChain.doTransforms(preTransformRecord);
+            String topic = maybeCreateAndGetTopic(record);
+            Message sourceMessage = convertTransformedRecord(topic, record);
+            if (sourceMessage == null || retryWithToleranceOperator.failed()) {
+                // commit record
+                recordFailed(preTransformRecord);
+                continue;
             }
-            // put extend msg property
-            putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
-
+            log.trace("{} Appending record to the topic {} , value {}", this, topic, record.getData());
+            /**prepare to send record*/
+            Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToSendRecord(preTransformRecord);
             try {
                 producer.send(sourceMessage, new SendCallback() {
                     @Override
-                    public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
+                    public void onSuccess(SendResult result) {
                         log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
-                        connectStatsManager.incSourceRecordWriteTotalNums();
-                        connectStatsManager.incSourceRecordWriteNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                        RecordPartition partition = position.getPartition();
-                        try {
-                            if (null != partition && null != position) {
-                                Map<String, String> offsetMap = (Map<String, String>) offset.getOffset();
-                                offsetMap.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, String.valueOf(sourceDataEntry.getTimestamp()));
-                                positionStorageWriter.putPosition(partition, offset);
-                            }
-
-                        } catch (Exception e) {
-                            log.error("Source task save position info failed. partition {}, offset {}", JSON.toJSONString(partition), JSON.toJSONString(offset), e);
-                        }
+                        // metrics
+                        incWriteRecordStat();
+                        // commit record for custom
+                        recordSent(preTransformRecord, sourceMessage, result);
+                        // ack record position
+                        submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
                     }
 
                     @Override
                     public void onException(Throwable throwable) {
                         log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable);
-                        connectStatsManager.incSourceRecordWriteTotalFailNums();
-                        connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                        // fail record metrics
+                        inWriteRecordFail();
+                        // record send failed
+                        recordSendFailed(false, sourceMessage, preTransformRecord, throwable);
                     }
                 });
-            } catch (MQClientException e) {
+            } catch (RetriableException e) {
+                log.warn("{} Failed to send record to topic '{}'. Backing off before retrying: ",
+                        this, sourceMessage.getTopic(), e);
+                // Intercepted as successfully sent, used to continue sending next time
+                toSendRecord = toSendRecord.subList(processed, toSendRecord.size());
+                // remove pre submit position, for retry
+                submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::remove);
+                return false;
+            } catch (MQClientException | RemotingException e) {
                 log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e);
-                connectStatsManager.incSourceRecordWriteTotalFailNums();
-                connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-            } catch (RemotingException e) {
-                log.error("Send message RemotingException. message: {}, error info: {}.", sourceMessage, e);
-                connectStatsManager.incSourceRecordWriteTotalFailNums();
-                connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                inWriteRecordFail();
+                recordSendFailed(true, sourceMessage, preTransformRecord, e);
             } catch (InterruptedException e) {
                 log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e);
-                connectStatsManager.incSourceRecordWriteTotalFailNums();
-                connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                inWriteRecordFail();
                 throw e;
             }
+            processed++;
         }
         toSendRecord = null;
+        return true;
+    }
+
+    private void prepareToPollTask() {
+        maybeThrowProducerSendException();
+    }
+
+    private void maybeThrowProducerSendException() {
+        if (producerSendException.get() != null) {
+            throw new ConnectException(
+                    "Unrecoverable exception from producer send callback",
+                    producerSendException.get()
+            );
+        }
+    }
+
+    private void recordSendFailed(
+            boolean synchronous,
+            Message sourceMessage,
+            ConnectRecord preTransformRecord,
+            Throwable e) {
+        if (synchronous) {
+            throw new ConnectException("Unrecoverable exception trying to send", e);
+        }
+        String topic = sourceMessage.getTopic();
+        if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
+            // ignore all error
+            log.trace(
+                    "Ignoring failed record send: {} failed to send record to {}: ",
+                    WorkerSourceTask.this,
+                    topic,
+                    e
+            );
+            retryWithToleranceOperator.executeFailed(
+                    ErrorReporter.Stage.ROCKETMQ_PRODUCE,
+                    WorkerSourceTask.class,
+                    preTransformRecord,
+                    e);
+            commitTaskRecord(preTransformRecord, null);
+        } else {
+            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+            producerSendException.compareAndSet(null, e);
+        }
+    }
+
+
+    /**
+     * failed send
+     *
+     * @param record
+     */
+    private void recordFailed(ConnectRecord record) {
+        commitTaskRecord(record, null);
+    }
+
+
+    /**
+     * send success record
+     *
+     * @param preTransformRecord
+     * @param sourceMessage
+     * @param result
+     */
+    private void recordSent(
+            ConnectRecord preTransformRecord,
+            Message sourceMessage,
+            SendResult result) {
+        commitTaskRecord(preTransformRecord, result);
+    }
+
+    private void commitTaskRecord(ConnectRecord preTransformRecord, SendResult result) {
+        ConnectKeyValue keyValue = null;
+        if (result != null) {
+            keyValue = new ConnectKeyValue();
+            keyValue.put("send.status", result.getSendStatus().name());
+            keyValue.put("msg.id", result.getMsgId());
+            keyValue.put("topic", result.getMessageQueue().getTopic());
+            keyValue.put("broker.name", result.getMessageQueue().getBrokerName());
+            keyValue.put("queue.id", result.getMessageQueue().getQueueId());
+            keyValue.put("queue.offset", result.getQueueOffset());
+            keyValue.put("transaction.id", result.getTransactionId());
+            keyValue.put("offset.msg.id", result.getOffsetMsgId());
+            keyValue.put("region.id", result.getRegionId());
+        }
+        sourceTask.commit(preTransformRecord, keyValue == null ? null : keyValue.getProperties());
+    }
+
+
+    /**
+     * Convert the source record into a producer record.
+     */
+    protected Message convertTransformedRecord(final String topic, ConnectRecord record) {
+        if (record == null) {
+            return null;
+        }
+        Message sourceMessage = new Message();
+        sourceMessage.setTopic(topic);
+        // converter
+        if (recordConverter == null) {
+            final byte[] messageBody = JSON.toJSONString(record, SerializerFeature.DisableCircularReferenceDetect, SerializerFeature.WriteMapNullValue).getBytes();
+            if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
+                log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
+            }
+            sourceMessage.setBody(messageBody);
+        } else {
+            byte[] messageBody = retryWithToleranceOperator.execute(() -> recordConverter.fromConnectData(topic, record.getSchema(), record.getData()),
+                    ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
+            if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
+                log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
+            }
+            sourceMessage.setBody(messageBody);
+        }
+
+        if (retryWithToleranceOperator.failed()) {
+            return null;
+        }
+        // put extend msg property
+        putExtendMsgProperty(record, sourceMessage, topic);
+        return sourceMessage;
+    }
+
+    /**
+     * maybe create and get topic
+     *
+     * @param record
+     * @return
+     */
+    private String maybeCreateAndGetTopic(ConnectRecord record) {
+        String topic = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
+        if (StringUtils.isBlank(topic)) {
+            RecordPosition recordPosition = record.getPosition();
+            if (null == recordPosition) {
+                log.error("connect-topicname config is null and recordPosition is null , lack of topic config");
+            }
+            RecordPartition partition = recordPosition.getPartition();
+            if (null == partition) {
+                log.error("connect-topicname config is null and partition is null , lack of topic config");
+            }
+            Map<String, ?> partitionMap = partition.getPartition();
+            if (null == partitionMap) {
+                log.error("connect-topicname config is null and partitionMap is null , lack of topic config");
+            }
+            Object o = partitionMap.get(TOPIC);
+            if (null == o) {
+                log.error("connect-topicname config is null and partitionMap.get is null , lack of topic config");
+            }
+            topic = (String) o;
+        }
+        if (StringUtils.isBlank(topic)) {
+            throw new ConnectException("source connect lack of topic config");
+        }
+        if (ConnectUtil.isTopicExist(workerConfig, topic)) {
+            ConnectUtil.createTopic(workerConfig, new TopicConfig(topic));
+        }
+        return topic;
     }
 
     private void putExtendMsgProperty(ConnectRecord sourceDataEntry, Message sourceMessage, String topic) {
@@ -399,7 +427,6 @@ public class WorkerSourceTask implements WorkerTask {
             log.info("extension keySet null.");
             return;
         }
-
         for (String key : keySet) {
             if (WHITE_KEY_SET.contains(key)) {
                 MessageAccessor.putProperty(sourceMessage, key, extensionKeyValues.getString(key));
@@ -410,42 +437,182 @@ public class WorkerSourceTask implements WorkerTask {
 
     }
 
+    /**
+     * initinalize and start
+     */
     @Override
-    public WorkerTaskState getState() {
-        return this.state.get();
+    protected void initializeAndStart() {
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            log.error("{} Source task producer start failed!!", this);
+            throw new ConnectException(e);
+        }
+        sourceTask.init(sourceTaskContext);
+        sourceTask.start(taskConfig);
+        log.info("{} Source task finished initialization and start", this);
     }
 
-    @Override
-    public String getConnectorName() {
-        return connectorName;
+
+    protected void recordPollReturned(int numRecordsInBatch) {
+        connectStatsManager.incSourceRecordPollTotalNums(numRecordsInBatch);
+        connectStatsManager.incSourceRecordPollNums(id().toString()+"", numRecordsInBatch);
     }
 
+    /**
+     * execute poll and send record
+     */
     @Override
-    public ConnectKeyValue getTaskConfig() {
-        return taskConfig;
+    protected void execute() {
+        while (isRunning()) {
+
+            updateCommittableOffsets();
+            if (CollectionUtils.isEmpty(toSendRecord)) {
+                try {
+                    prepareToPollTask();
+                    toSendRecord = poll();
+                    if (null != toSendRecord && toSendRecord.size() > 0) {
+                        recordPollReturned(toSendRecord.size());
+                    }
+                    if (toSendRecord == null) {
+                        continue;
+                    }
+                    log.trace("{} About to send {} records to RocketMQ", this, toSendRecord.size());
+                    if (!sendRecord()) {
+                        stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
+                    }
+                } catch (InterruptedException e) {
+                    // Ignore and allow to exit.
+                } catch (Exception e) {
+                    try {
+                        finalOffsetCommit(true);
+                    } catch (Exception offsetException) {
+                        log.error("Failed to commit offsets for already-failing task", offsetException);
+                    }
+                    throw e;
+                } finally {
+                    finalOffsetCommit(false);
+                    // record source poll times
+                    connectStatsManager.incSourceRecordPollTotalTimes();
+                }
+            }
+            AtomicLong atomicLong = connectStatsService.singleSourceTaskTimesTotal(id().toString());
+            if (null != atomicLong) {
+                atomicLong.addAndGet(toSendRecord == null ? 0 : toSendRecord.size());
+            }
+        }
     }
 
-    @Override
-    public void timeout() {
-        this.state.set(WorkerTaskState.ERROR);
+    protected void finalOffsetCommit(boolean b) {
+
+        offsetManagement.awaitAllMessages(
+                workerConfig.getOffsetCommitTimeoutMsConfig(),
+                TimeUnit.MILLISECONDS
+        );
+        updateCommittableOffsets();
+        commitOffsets();
     }
 
-    @Override
-    public String toString() {
+    public boolean commitOffsets() {
+        long commitTimeoutMs = workerConfig.getOffsetCommitTimeoutMsConfig();
+        log.debug("{} Committing offsets", this);
+
+        long started = System.currentTimeMillis();
+        long timeout = started + commitTimeoutMs;
+
+        RecordOffsetManagement.CommittableOffsets offsetsToCommit;
+        synchronized (this) {
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
+        }
+
+        if (committableOffsets.isEmpty()) {
+            log.debug("{} Either no records were produced by the task since the last offset commit, "
+                            + "or every record has been filtered out by a transformation "
+                            + "or dropped due to transformation or conversion errors.",
+                    this
+            );
+            // We continue with the offset commit process here instead of simply returning immediately
+            // in order to invoke SourceTask::commit and record metrics for a successful offset commit
+        } else {
+            log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages());
+            if (committableOffsets.hasPending()) {
+                log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+                                + "The source partition with the most pending messages is {}, with {} pending messages",
+                        this,
+                        committableOffsets.numUncommittableMessages(),
+                        committableOffsets.numDeques(),
+                        committableOffsets.largestDequePartition(),
+                        committableOffsets.largestDequeSize()
+                );
+            } else {
+                log.debug("{} There are currently no pending messages for this offset commit; "
+                                + "all messages dispatched to the task's producer since the last commit have been acknowledged",
+                        this
+                );
+            }
+        }
+
+        // write offset
+        offsetsToCommit.offsets().forEach(positionStorageWriter::writeOffset);
+
+        // begin flush
+        if (!positionStorageWriter.beginFlush()) {
+            // There was nothing in the offsets to process, but we still mark a successful offset commit.
+            long durationMillis = System.currentTimeMillis() - started;
+            log.debug("{} Finished offset commitOffsets successfully in {} ms",
+                    this, durationMillis);
+            commitSourceTask();
+            return true;
+        }
+
+        Future<Void> flushFuture = positionStorageWriter.doFlush((error, key, result) -> {
+            if (error != null) {
+                log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
+            } else {
+                log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
+            }
+        });
+        try {
+            flushFuture.get(Math.max(timeout - System.currentTimeMillis(), 0), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.warn("{} Flush of offsets interrupted, cancelling", this);
+            positionStorageWriter.cancelFlush();
+            return false;
+        } catch (ExecutionException e) {
+            log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
+            positionStorageWriter.cancelFlush();
+            return false;
+        } catch (TimeoutException e) {
+            log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this);
+            positionStorageWriter.cancelFlush();
+            return false;
+        }
 
-        StringBuilder sb = new StringBuilder();
-        sb.append("connectorName:" + connectorName)
-                .append("\nConfigs:" + JSON.toJSONString(taskConfig))
-                .append("\nState:" + state.get().toString());
-        return sb.toString();
+        long durationMillis = System.currentTimeMillis() - started;
+        log.debug("{} Finished commitOffsets successfully in {} ms",
+                this, durationMillis);
+        commitSourceTask();
+        return true;
     }
 
-    @Override
-    public Object getJsonObject() {
-        HashMap obj = new HashMap<String, Object>();
-        obj.put("connectorName", connectorName);
-        obj.put("configs", JSON.toJSONString(taskConfig));
-        obj.put("state", state.get().toString());
-        return obj;
+    protected void commitSourceTask() {
+        try {
+            this.sourceTask.commit();
+        } catch (Throwable t) {
+            log.error("{} Exception thrown while calling task.commit()", this, t);
+        }
     }
+
+
+    private void inWriteRecordFail() {
+        connectStatsManager.incSourceRecordWriteTotalFailNums();
+        connectStatsManager.incSourceRecordWriteFailNums(id().toString());
+    }
+
+    private void incWriteRecordStat() {
+        connectStatsManager.incSourceRecordWriteTotalNums();
+        connectStatsManager.incSourceRecordWriteNums(id().toString());
+    }
+
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskContext.java
new file mode 100644
index 0000000..5ab6504
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+
+/**
+ * worker source task context
+ */
+public class WorkerSourceTaskContext implements SourceTaskContext {
+
+    private final OffsetStorageReader reader;
+    private final WorkerSourceTask task;
+    private final ConnectKeyValue keyValue;
+
+    public WorkerSourceTaskContext(OffsetStorageReader reader,
+                                   WorkerSourceTask task,
+                                   ConnectKeyValue config) {
+        this.reader = reader;
+        this.task = task;
+        this.keyValue = config;
+    }
+
+    @Override
+    public KeyValue configs() {
+        return this.keyValue;
+    }
+
+    @Override
+    public OffsetStorageReader offsetStorageReader() {
+        return reader;
+    }
+
+    @Override
+    public String getConnectorName() {
+        return task.id().connector();
+    }
+
+    @Override
+    public String getTaskName() {
+        return task.id().task() + "";
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
index 52bd26d..9ffd1c2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java
@@ -15,24 +15,192 @@
  *  limitations under the License.
  */
 package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.apache.rocketmq.connect.runtime.utils.CurrentTaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Should we use callable here ?
  */
-public interface WorkerTask extends Runnable {
+public abstract class WorkerTask implements Runnable {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerTask.class);
+    private static final String THREAD_NAME_PREFIX = "task-thread-";
+
+    protected  final ConnectConfig workerConfig;
+
+    protected final ConnectorTaskId id;
+    protected final ClassLoader loader;
+    protected final ConnectKeyValue taskConfig;
+    /**
+     * Atomic state variable
+     */
+    protected AtomicReference<WorkerTaskState> state;
+    /**
+     * worker state
+     */
+    protected final AtomicReference<WorkerState> workerState;
+    protected final RetryWithToleranceOperator retryWithToleranceOperator;
+    protected final TransformChain<ConnectRecord> transformChain;
+
+
+    public WorkerTask(ConnectConfig workerConfig, ConnectorTaskId id, ClassLoader loader, ConnectKeyValue taskConfig, RetryWithToleranceOperator retryWithToleranceOperator, TransformChain<ConnectRecord> transformChain, AtomicReference<WorkerState> workerState) {
+        this.workerConfig = workerConfig;
+        this.id = id;
+        this.loader = loader;
+        this.taskConfig = taskConfig;
+        this.state = new AtomicReference<>(WorkerTaskState.NEW);
+        this.workerState = workerState;
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.transformChain = transformChain;
+        this.transformChain.retryWithToleranceOperator(this.retryWithToleranceOperator);
+    }
+
+    public ConnectorTaskId id() {
+        return id;
+    }
+
+    public ClassLoader loader() {
+        return loader;
+    }
+
+    /**
+     * Initialize the task for execution.
+     *
+     * @param taskConfig initial configuration
+     */
+    protected void initialize(ConnectKeyValue taskConfig) {
+        // NO-op
+    }
+
+    /**
+     * initinalize and start
+     */
+    protected abstract void initializeAndStart();
+
+    public void doInitializeAndStart() {
+        state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
+        initializeAndStart();
+        state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
+    }
+
+    /**
+     * execute poll and send record
+     */
+    protected abstract void execute();
+
+    private void doExecute() {
+        execute();
+    }
+
+    /**
+     * get state
+     *
+     * @return
+     */
+    public WorkerTaskState getState() {
+        return this.state.get();
+    }
+
+    protected boolean isRunning() {
+        return WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get();
+    }
+
+    protected boolean isStopping() {
+        return !isRunning();
+    }
+
+    /**
+     * close resources
+     */
+    protected abstract void close();
+
+    public void doClose() {
+        try {
+            state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
+            close();
+            state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
+        } catch (Throwable t) {
+            log.error("{} Task threw an uncaught and unrecoverable exception during shutdown", this, t);
+            throw t;
+        }
+    }
+
+    /**
+     * clean up
+     */
+    public void cleanup() {
+        log.info("Cleaning a task, current state {}, destination state {}", state.get().name(), WorkerTaskState.TERMINATED.name());
+        if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED)
+                || state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED)) {
+            log.info("Cleaning a task success");
+        } else {
+            log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state");
+        }
+    }
+
+
+    public ConnectKeyValue currentTaskConfig() {
+        return taskConfig;
+    }
 
-    public  WorkerTaskState getState();
+    /**
+     * current task state
+     *
+     * @return
+     */
+    public CurrentTaskState currentTaskState() {
+        return new CurrentTaskState(id().connector(), taskConfig, state.get());
+    }
 
-    public void stop();
 
-    public void cleanup();
+    private void doRun() throws InterruptedException {
+        try {
+            doInitializeAndStart();
+            // while poll
+            doExecute();
+        } catch (Throwable t) {
+            log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t);
+            throw t;
+        } finally {
+            doClose();
+        }
+    }
 
-    public String getConnectorName();
+    /**
+     * do execute data
+     */
+    @Override
+    public void run() {
+        String savedName = Thread.currentThread().getName();
+        try {
+            Thread.currentThread().setName(THREAD_NAME_PREFIX + id);
+            doRun();
+        } catch (Throwable t) {
+            onFailure(t);
+            throw (Error) t;
+        } finally {
+            Thread.currentThread().setName(savedName);
+        }
+    }
 
-    public ConnectKeyValue getTaskConfig();
+    public void onFailure(Throwable t) {
+        synchronized (this) {
+            state.set(WorkerTaskState.ERROR);
+        }
+    }
 
-    public Object getJsonObject();
+    public void timeout() {
+        log.error("Worker task stop is timeout !!!");
+        onFailure(null);
+    }
 
-    public void timeout();
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
index 2125fc6..89cd15a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
@@ -48,6 +48,12 @@ public interface ErrorReporter extends AutoCloseable {
          */
         TASK_POLL,
 
+        /**
+         * rocketmq produce
+         */
+        ROCKETMQ_PRODUCE,
+
+
         /**
          * sink put
          */
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
index b0fa283..e331232 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -75,6 +75,21 @@ public class RetryWithToleranceOperator implements AutoCloseable {
         }
     }
 
+    public synchronized void executeFailed(ErrorReporter.Stage stage,
+                                           Class<?> executingClass,
+                                           ConnectRecord sourceRecord,
+                                           Throwable error) {
+        markAsFailed();
+        context.sourceRecord(sourceRecord);
+        context.currentContext(stage, executingClass);
+        context.error(error);
+        context.report();
+        if (!withinToleranceLimits()) {
+            throw new ConnectException("Tolerance exceeded in Source Worker error handler", error);
+        }
+    }
+
+
     /**
      * Execute the recoverable operation. If the operation is already in a failed state, then simply return
      * with the existing failure.
@@ -232,6 +247,9 @@ public class RetryWithToleranceOperator implements AutoCloseable {
         return this.context.error();
     }
 
+    public ToleranceType getErrorToleranceType() {
+        return toleranceType;
+    }
 
     @Override
     public void close() {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index 58e4e7e..1eda51f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -239,7 +239,7 @@ public class RestHandler {
     private Set<Object> convertWorkerTaskToString(Set<Runnable> tasks) {
         Set<Object> result = new HashSet<>();
         for (Runnable task : tasks) {
-            result.add(((WorkerTask) task).getJsonObject());
+            result.add(((WorkerTask) task).currentTaskState());
         }
         return result;
     }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
new file mode 100644
index 0000000..45f641c
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.connector.Connector;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
+import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Interface for config manager. Contains connector configs and task configs. All worker in a cluster should keep the
+ * same configs.
+ */
+public abstract class AbstractConfigManagementService implements ConfigManagementService {
+    /**
+     * Current connector configs in the store.
+     */
+    protected KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
+
+    @Override
+    public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, ConnectKeyValue configs) {
+        int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
+        ConnectKeyValue connectConfig = connectorKeyValueStore.get(connectorName);
+        boolean directEnable = Boolean.parseBoolean(connectConfig.getString(RuntimeConfigDefine.CONNECTOR_DIRECT_ENABLE));
+        List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
+        List<ConnectKeyValue> converterdConfigs = new ArrayList<>();
+        int taskId = 0;
+        for (KeyValue keyValue : taskConfigs) {
+            ConnectKeyValue newKeyValue = new ConnectKeyValue();
+            for (String key : keyValue.keySet()) {
+                newKeyValue.put(key, keyValue.getString(key));
+            }
+            if (directEnable) {
+                newKeyValue.put(RuntimeConfigDefine.TASK_TYPE, Worker.TaskType.DIRECT.name());
+                newKeyValue.put(RuntimeConfigDefine.SOURCE_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS));
+                newKeyValue.put(RuntimeConfigDefine.SINK_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SINK_TASK_CLASS));
+            }
+            // put task id
+            newKeyValue.put(RuntimeConfigDefine.TASK_ID, taskId);
+            newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
+            newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
+
+            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
+            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
+            Set<String> connectConfigKeySet = configs.keySet();
+            for (String connectConfigKey : connectConfigKeySet) {
+                if (connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
+                    newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
+                }
+            }
+            converterdConfigs.add(newKeyValue);
+            taskId++;
+        }
+        putTaskConfigs(connectorName, converterdConfigs);
+    }
+
+    protected abstract void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs);
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 0ee1655..1461f5e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -46,13 +46,13 @@ import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallba
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConfigManagementServiceImpl implements ConfigManagementService {
+public class ConfigManagementServiceImpl extends AbstractConfigManagementService {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
-    /**
-     * Current connector configs in the store.
-     */
-    private KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
+//    /**
+//     * Current connector configs in the store.
+//     */
+//    private KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
 
     /**
      * Current task configs in the store.
@@ -196,35 +196,7 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
 
     @Override
     public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, ConnectKeyValue configs) {
-        int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
-        ConnectKeyValue connectConfig = connectorKeyValueStore.get(connectorName);
-        boolean directEnable = Boolean.parseBoolean(connectConfig.getString(RuntimeConfigDefine.CONNECTOR_DIRECT_ENABLE));
-        List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
-        List<ConnectKeyValue> converterdConfigs = new ArrayList<>();
-        for (KeyValue keyValue : taskConfigs) {
-            ConnectKeyValue newKeyValue = new ConnectKeyValue();
-            for (String key : keyValue.keySet()) {
-                newKeyValue.put(key, keyValue.getString(key));
-            }
-            if (directEnable) {
-                newKeyValue.put(RuntimeConfigDefine.TASK_TYPE, Worker.TaskType.DIRECT.name());
-                newKeyValue.put(RuntimeConfigDefine.SOURCE_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS));
-                newKeyValue.put(RuntimeConfigDefine.SINK_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SINK_TASK_CLASS));
-            }
-            newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
-            newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-
-            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
-            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
-            Set<String> connectConfigKeySet = configs.keySet();
-            for (String connectConfigKey : connectConfigKeySet) {
-                if (connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
-                    newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
-                }
-            }
-            converterdConfigs.add(newKeyValue);
-        }
-        putTaskConfigs(connectorName, converterdConfigs);
+        super.recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
         sendSynchronizeConfig();
         triggerListener();
     }
@@ -260,7 +232,8 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
         return result;
     }
 
-    private void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs) {
+    @Override
+    protected void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs) {
 
         List<ConnectKeyValue> exist = taskKeyValueStore.get(connectorName);
         if (null != exist && exist.size() > 0) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 11e35e5..c372d68 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -127,7 +127,6 @@ public class PositionManagementServiceImpl implements PositionManagementService
 
     @Override
     public RecordOffset getPosition(ExtendRecordPartition partition) {
-
         return positionStore.get(partition);
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index 0668c37..fa5425b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -23,8 +23,7 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
-import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
-import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.AbstractConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.StagingMode;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.MemoryBasedKeyValueStore;
@@ -40,14 +39,9 @@ import java.util.Map;
 /**
  * memory config management service impl for standalone
  */
-public class MemoryConfigManagementServiceImpl implements ConfigManagementService {
+public class MemoryConfigManagementServiceImpl extends AbstractConfigManagementService {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
-    /**
-     * Current connector configs in the store.
-     */
-    private KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
-
     /**
      * Current task configs in the store.
      */
@@ -63,7 +57,8 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
     public MemoryConfigManagementServiceImpl() {
     }
 
-    @Override public void initialize(ConnectConfig connectConfig, Plugin plugin) {
+    @Override
+    public void initialize(ConnectConfig connectConfig, Plugin plugin) {
         this.connectorKeyValueStore = new MemoryBasedKeyValueStore<>();
         this.taskKeyValueStore = new MemoryBasedKeyValueStore<>();
         this.plugin = plugin;
@@ -155,35 +150,7 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
 
     @Override
     public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, ConnectKeyValue configs) {
-        int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
-        ConnectKeyValue connectConfig = connectorKeyValueStore.get(connectorName);
-        boolean directEnable = Boolean.parseBoolean(connectConfig.getString(RuntimeConfigDefine.CONNECTOR_DIRECT_ENABLE));
-        List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
-        List<ConnectKeyValue> converterdConfigs = new ArrayList<>();
-        for (KeyValue keyValue : taskConfigs) {
-            ConnectKeyValue newKeyValue = new ConnectKeyValue();
-            for (String key : keyValue.keySet()) {
-                newKeyValue.put(key, keyValue.getString(key));
-            }
-            if (directEnable) {
-                newKeyValue.put(RuntimeConfigDefine.TASK_TYPE, Worker.TaskType.DIRECT.name());
-                newKeyValue.put(RuntimeConfigDefine.SOURCE_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS));
-                newKeyValue.put(RuntimeConfigDefine.SINK_TASK_CLASS, connectConfig.getString(RuntimeConfigDefine.SINK_TASK_CLASS));
-            }
-            newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
-            newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-
-            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
-            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
-            Set<String> connectConfigKeySet = configs.keySet();
-            for (String connectConfigKey : connectConfigKeySet) {
-                if (connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
-                    newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey));
-                }
-            }
-            converterdConfigs.add(newKeyValue);
-        }
-        putTaskConfigs(connectorName, converterdConfigs);
+        super.recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
         triggerListener();
     }
 
@@ -214,7 +181,8 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
         return result;
     }
 
-    private void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs) {
+    @Override
+    protected void putTaskConfigs(String connectorName, List<ConnectKeyValue> configs) {
         List<ConnectKeyValue> exist = taskKeyValueStore.get(connectorName);
         if (null != exist && exist.size() > 0) {
             taskKeyValueStore.remove(connectorName);
@@ -245,7 +213,8 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
         return this.plugin;
     }
 
-    @Override public StagingMode getStagingMode() {
+    @Override
+    public StagingMode getStagingMode() {
         return StagingMode.STANDALONE;
     }
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
index c5fa1fa..5b91217 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
@@ -21,23 +21,173 @@ package org.apache.rocketmq.connect.runtime.store;
 
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.storage.OffsetStorageWriter;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
 /**
  * position storage writer
  */
-public class PositionStorageWriter {
+public class PositionStorageWriter implements OffsetStorageWriter, Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
     private PositionManagementService positionManagementService;
     private final String namespace;
 
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    /**
+     * Offset data in Connect format
+     */
+    private Map<ExtendRecordPartition, RecordOffset> data = new HashMap<>();
+    private Map<ExtendRecordPartition, RecordOffset> toFlush = null;
+
+    // Unique ID for each flush request to handle callbacks after timeouts
+    private long currentFlushId = 0;
+
     public PositionStorageWriter(String namespace, PositionManagementService positionManagementService) {
         this.namespace = namespace;
         this.positionManagementService = positionManagementService;
     }
 
-    public void putPosition(RecordPartition partition, RecordOffset position) {
+    @Override
+    public void writeOffset(RecordPartition partition, RecordOffset position) {
         ExtendRecordPartition extendRecordPartition = new ExtendRecordPartition(namespace, partition.getPartition());
-        positionManagementService.putPosition(extendRecordPartition, position);
+        data.put(extendRecordPartition, position);
+    }
+
+    /**
+     * write offsets
+     *
+     * @param positions positions
+     */
+    @Override
+    public void writeOffset(Map<RecordPartition, RecordOffset> positions) {
+        for (Map.Entry<RecordPartition, RecordOffset> offset : positions.entrySet()) {
+            writeOffset(offset.getKey(), offset.getValue());
+        }
+    }
+
+
+    private boolean isFlushing() {
+        return toFlush != null;
+    }
+
+    /**
+     * begin flush offset
+     *
+     * @return
+     */
+    public synchronized boolean beginFlush() {
+        if (isFlushing()) {
+            throw new ConnectException("PositionStorageWriter is already flushing");
+        }
+        if (data.isEmpty()) {
+            return false;
+        }
+        this.toFlush = this.data;
+        this.data = new HashMap<>();
+        return true;
+    }
+
+    /**
+     * do flush offset
+     *
+     * @param callback
+     */
+    public Future doFlush(final DataSynchronizerCallback callback) {
+        final long flushId = currentFlushId;
+        return sendOffsetFuture(callback, flushId);
+    }
+
+    /**
+     * Cancel a flush that has been initiated by {@link #beginFlush}.
+     */
+    public synchronized void cancelFlush() {
+        if (isFlushing()) {
+            // rollback to inited
+            toFlush.putAll(data);
+            data = toFlush;
+            currentFlushId++;
+            toFlush = null;
+        }
+    }
+
+    private Future<Void> sendOffsetFuture(DataSynchronizerCallback callback, long flushId) {
+        FutureTask<Void> futureTask = new FutureTask<Void>(new SendOffsetCallback(callback, flushId));
+        executorService.submit(futureTask);
+        return futureTask;
+    }
+
+    /**
+     * Closes this stream and releases any system resources associated
+     * with it. If the stream is already closed then invoking this
+     * method has no effect.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    @Override
+    public void close() throws IOException {
+        if (executorService != null) {
+            executorService.shutdown();
+        }
     }
+
+
+    /**
+     * send offset callback
+     */
+    private class SendOffsetCallback implements Callable<Void> {
+        DataSynchronizerCallback callback;
+        long flushId;
+
+        public SendOffsetCallback(DataSynchronizerCallback callback, long flushId) {
+            this.callback = callback;
+            this.flushId = flushId;
+        }
+
+        /**
+         * Computes a result, or throws an exception if unable to do so.
+         *
+         * @return computed result
+         * @throws Exception if unable to compute a result
+         */
+        @Override
+        public Void call() throws Exception {
+            try {
+                // has been canceled
+                if (flushId != currentFlushId) {
+                    return null;
+                }
+                positionManagementService.putPosition(toFlush);
+                log.debug("Submitting {} entries to backing store. The offsets are: {}", data.size(), toFlush);
+                positionManagementService.persist();
+                positionManagementService.synchronize();
+                // persist finished
+                toFlush = null;
+                currentFlushId++;
+            } catch (Throwable throwable) {
+                // rollback
+                cancelFlush();
+                this.callback.onCompletion(throwable, null, null);
+            }
+            return null;
+        }
+    }
+
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 5f7ad28..a6902cc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -288,18 +288,18 @@ public class ConnectUtil {
         return recordPartition;
     }
 
-    public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, String connectorName, ConnectKeyValue keyValue, Integer taskId) {
+    public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, ConnectorTaskId  id, ConnectKeyValue keyValue) {
         RPCHook rpcHook = null;
         if (connectConfig.getAclEnable()) {
             rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
         }
         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
-        consumer.setInstanceName(createInstance(connectorName.concat("-").concat(taskId.toString())));
+        consumer.setInstanceName(id.toString());
         String taskGroupId = keyValue.getString("task-group-id");
         if (StringUtils.isNotBlank(taskGroupId)) {
             consumer.setConsumerGroup(taskGroupId);
         } else {
-            consumer.setConsumerGroup(SYS_TASK_CG_PREFIX + connectorName);
+            consumer.setConsumerGroup(SYS_TASK_CG_PREFIX + id.connector());
         }
         if (StringUtils.isNotBlank(connectConfig.getNamesrvAddr())) {
             consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectorTaskId.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectorTaskId.java
new file mode 100644
index 0000000..25e6dc5
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectorTaskId.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.utils;
+
+import com.alibaba.fastjson.annotation.JSONCreator;
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Unique ID for a single task. It includes a unique connector ID and a task ID that is unique within
+ * the connector.
+ */
+public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId> {
+    private final String connector;
+    private final int task;
+
+    @JSONCreator
+    public ConnectorTaskId(@JSONField(name = "connector") String connector, @JSONField(name = "task") int task) {
+        this.connector = connector;
+        this.task = task;
+    }
+
+    @JSONField(name = "connector")
+    public String connector() {
+        return connector;
+    }
+
+    @JSONField
+    public int task() {
+        return task;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ConnectorTaskId that = (ConnectorTaskId) o;
+
+        if (task != that.task)
+            return false;
+
+        return Objects.equals(connector, that.connector);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = connector != null ? connector.hashCode() : 0;
+        result = 31 * result + task;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return connector + '-' + task;
+    }
+
+    @Override
+    public int compareTo(ConnectorTaskId o) {
+        int connectorCmp = connector.compareTo(o.connector);
+        if (connectorCmp != 0)
+            return connectorCmp;
+        return Integer.compare(task, o.task);
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/CurrentTaskState.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/CurrentTaskState.java
new file mode 100644
index 0000000..401db98
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/CurrentTaskState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.utils;
+
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerTaskState;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+
+/**
+ * current task state
+ */
+public class CurrentTaskState implements Serializable {
+
+    private String connectorName;
+    private Map<String, String> config;
+    private WorkerTaskState taskState;
+
+
+    public CurrentTaskState(String connector, ConnectKeyValue connectKeyValue, WorkerTaskState taskState) {
+        this.connectorName = connector;
+        this.config = connectKeyValue.getProperties();
+        this.taskState =  taskState;
+    }
+
+    public String getConnectorName() {
+        return connectorName;
+    }
+
+    public void setConnectorName(String connectorName) {
+        this.connectorName = connectorName;
+    }
+
+    public Map<String, String> getConfig() {
+        return config;
+    }
+
+    public void setConfig(Map<String, String> config) {
+        this.config = config;
+    }
+
+    public WorkerTaskState getTaskState() {
+        return taskState;
+    }
+
+    public void setTaskState(WorkerTaskState taskState) {
+        this.taskState = taskState;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof CurrentTaskState)) return false;
+        CurrentTaskState that = (CurrentTaskState) o;
+        return Objects.equals(connectorName, that.connectorName) && Objects.equals(config, that.config) && taskState == that.taskState;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(connectorName, config, taskState);
+    }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 7931eac..529d1ba 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -45,6 +45,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.TestUtils;
 import org.junit.After;
@@ -118,8 +119,10 @@ public class WorkerTest {
             RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
             retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("TEST-CONN-" + i, connectKeyValue));
 
-            runnables.add(new WorkerSourceTask("TEST-CONN-" + i,
+            runnables.add(new WorkerSourceTask(new ConnectConfig(),
+                    new ConnectorTaskId("TEST-CONN-" + i,i),
                 new TestSourceTask(),
+                null,
                 connectKeyValue,
                 new TestPositionManageServiceImpl(),
                 new JsonConverter(),
@@ -189,7 +192,7 @@ public class WorkerTest {
             } else {
                 workerSinkTask = (WorkerSinkTask) runnable;
             }
-            String connectorName = null != workerSourceTask ? workerSourceTask.getConnectorName() : workerSinkTask.getConnectorName();
+            String connectorName = null != workerSourceTask ? workerSourceTask.id().connector() : workerSinkTask.id().connector();
             assertThat(connectorName).isIn("TEST-CONN-0", "TEST-CONN-1", "TEST-CONN-2", "TEST-CONN-3");
         }
     }
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index 098601b..f82fcb2 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -57,6 +57,7 @@ import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.junit.Before;
 import org.junit.Test;
@@ -212,13 +213,13 @@ public class RestHandlerTest {
         RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
         retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName1", connectKeyValue));
 
-        WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
+        WorkerSourceTask workerSourceTask1 = new WorkerSourceTask(new ConnectConfig(),new ConnectorTaskId("testConnectorName1", 1), sourceTask,null, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
 
         // create retry operator
         RetryWithToleranceOperator retryWithToleranceOperator02 = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
         retryWithToleranceOperator02.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName2", connectKeyValue));
 
-        WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
+        WorkerSourceTask workerSourceTask2 = new WorkerSourceTask(new ConnectConfig(), new ConnectorTaskId("testConnectorName2", 1), sourceTask,null, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
         workerTasks = new HashSet<Runnable>() {
             {
                 add(workerSourceTask1);