You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/04/01 07:29:26 UTC

[rocketmq-connect] branch master updated: [ISSUE #14] [Part 3] Optimize work sink task (#15)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new ba1c3e1  [ISSUE #14] [Part 3] Optimize work sink task (#15)
ba1c3e1 is described below

commit ba1c3e174f710a53a503f514a3094ef037a95ae9
Author: zhoubo <87...@qq.com>
AuthorDate: Fri Apr 1 15:29:23 2022 +0800

    [ISSUE #14] [Part 3] Optimize work sink task (#15)
    
    * optimize worker sink task
    Task fragmentation optimization
    pull msg error pause
    commit offset optimize
    reset offset support
    optimize pull consumer to pull msg
    save offset replace msg sync to rocketmq offset manager
    try cache optimize
    
    * fix load connector api repeat
    
    * reblance add try cache
---
 rocketmq-connect-runtime/pom.xml                   |   5 +
 .../connect/runtime/common/QueueState.java         |  23 ++
 .../connect/runtime/config/ConnectConfig.java      |  10 +
 .../runtime/config/SinkConnectorConfig.java        |  50 +++
 .../runtime/connectorwrapper/WorkerSinkTask.java   | 430 +++++++++++----------
 .../connectorwrapper/WorkerSinkTaskContext.java    | 195 ++++++++++
 6 files changed, 502 insertions(+), 211 deletions(-)

diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml
index 51d32f5..6571673 100644
--- a/rocketmq-connect-runtime/pom.xml
+++ b/rocketmq-connect-runtime/pom.xml
@@ -240,6 +240,11 @@
             <artifactId>reflections</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.4</version>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/QueueState.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/QueueState.java
new file mode 100644
index 0000000..154d11b
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/QueueState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.connect.runtime.common;
+
+public enum QueueState {
+    PAUSE;
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
index 3620e34..5984b6d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java
@@ -18,12 +18,22 @@
 package org.apache.rocketmq.connect.runtime.config;
 
 import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.connect.runtime.common.LoggerName.ROCKETMQ_RUNTIME;
 
 /**
  * Configurations for runtime.
  */
 public class ConnectConfig {
 
+    private static final Logger log = LoggerFactory.getLogger(ROCKETMQ_RUNTIME);
+
+    public static final String COMMA = ",";
+
+    public static final String SEMICOLON = ";";
+
     /**
      * The unique ID of each worker instance in the cluster
      */
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java
new file mode 100644
index 0000000..f533093
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.connect.runtime.config;
+
+import com.google.common.base.Splitter;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+
+public class SinkConnectorConfig extends ConnectConfig {
+
+
+
+    public static Set<String> parseTopicList(ConnectKeyValue taskConfig) {
+        String messageQueueStr = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
+        if (StringUtils.isBlank(messageQueueStr)) {
+            return null;
+        }
+        List<String> topicList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
+        return new HashSet<>(topicList);
+    }
+
+    public static MessageQueue parseMessageQueueList(String messageQueueStr) {
+        List<String> messageQueueStrList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
+        if (CollectionUtils.isEmpty(messageQueueStrList) || messageQueueStrList.size() != 3) {
+            return null;
+        }
+        return new MessageQueue(messageQueueStrList.get(0), messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2)));
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 8714ac8..0499534 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -20,38 +20,45 @@ package org.apache.rocketmq.connect.runtime.connectorwrapper;
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
-import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Converter;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.errors.ConnectException;
 import io.openmessaging.connector.api.errors.RetriableException;
 import io.openmessaging.connector.api.storage.OffsetStorageReader;
 import io.openmessaging.internal.DefaultKeyValue;
-import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.common.QueueState;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
 import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
@@ -94,7 +101,6 @@ public class WorkerSinkTask implements WorkerTask {
      */
     private ConnectKeyValue taskConfig;
 
-
     /**
      * Atomic state variable
      */
@@ -104,8 +110,6 @@ public class WorkerSinkTask implements WorkerTask {
      * Stop retry limit
      */
 
-
-
     /**
      * A RocketMQ consumer to pull message from MQ.
      */
@@ -138,8 +142,20 @@ public class WorkerSinkTask implements WorkerTask {
 
     private long nextCommitTime = 0;
 
+    private Set<RecordPartition> recordPartitions = new CopyOnWriteArraySet<>();
+
+    private long pullMsgErrorCount = 0;
+
+    private static final long PULL_MSG_ERROR_BACKOFF_MS = 1000 * 10;
+
+    private static final long PULL_MSG_ERROR_THRESHOLD = 16;
+
     private final AtomicReference<WorkerState> workerState;
 
+    private final CountDownLatch stopPullMsgLatch;
+
+    private WorkerSinkTaskContext sinkTaskContext;
+
     private final TransformChain<ConnectRecord> transformChain;
 
     public static final String BROKER_NAME = "brokerName";
@@ -166,6 +182,7 @@ public class WorkerSinkTask implements WorkerTask {
         this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
         this.state = new AtomicReference<>(WorkerTaskState.NEW);
         this.workerState = workerState;
+        this.stopPullMsgLatch = new CountDownLatch(1);
         this.transformChain = transformChain;
     }
 
@@ -175,194 +192,32 @@ public class WorkerSinkTask implements WorkerTask {
     @Override
     public void run() {
         try {
+            registTopics();
             consumer.start();
-            log.info("Sink task consumer start.");
+            log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig));
             state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING);
             sinkTask.init(taskConfig);
-            String topicNamesStr = taskConfig.getString(QUEUENAMES_CONFIG);
-            String topicQueuesStr = taskConfig.getString(TOPIC_QUEUES_CONFIG);
-
-            if (!StringUtils.isEmpty(topicNamesStr)) {
-                String[] topicNames = topicNamesStr.split(COMMA);
-                for (String topicName : topicNames) {
-                    final Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topicName);
-                    for (MessageQueue messageQueue : messageQueues) {
-                        final long offset = consumer.searchOffset(messageQueue, TIMEOUT);
-                        messageQueuesOffsetMap.put(messageQueue, offset);
-                    }
-                    messageQueues.addAll(messageQueues);
-                }
-                log.debug("{} Initializing and starting task for topicNames {}", this, topicNames);
-            } else if (!StringUtils.isEmpty(topicQueuesStr)) {
-                String[] topicQueues = topicQueuesStr.split(SEMICOLON);
-                for (String messageQueueStr : topicQueues) {
-                    String[] items = messageQueueStr.split(COMMA);
-                    if (items.length != 3) {
-                        log.error("Topic queue format error, topicQueueStr : " + topicNamesStr);
-                        return;
-                    }
-                    MessageQueue messageQueue = new MessageQueue(items[0], items[1], Integer.valueOf(items[2]));
-                    final long offset = consumer.searchOffset(messageQueue, TIMEOUT);
-                    messageQueuesOffsetMap.put(messageQueue, offset);
-                }
-            } else {
-                log.error("Lack of sink comsume topicNames config");
-                state.set(WorkerTaskState.ERROR);
-                return;
-            }
-
-            for (Map.Entry<MessageQueue, Long> entry : messageQueuesOffsetMap.entrySet()) {
-                MessageQueue messageQueue = entry.getKey();
-                RecordOffset recordOffset = offsetStorageReader.readOffset(ConnectUtil.convertToRecordPartition(messageQueue));
-                if (null != recordOffset) {
-                    messageQueuesOffsetMap.put(messageQueue, ConnectUtil.convertToOffset(recordOffset));
-                }
-            }
-
-            sinkTask.start(new SinkTaskContext() {
-                @Override
-                public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
-                    if (null == recordPartition || null == recordPartition.getPartition() || null == recordOffset || null == recordOffset.getOffset()) {
-                        log.warn("recordPartition {} info is null or recordOffset {} info is null", recordPartition, recordOffset);
-                        return;
-                    }
-                    String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
-                    String topic = (String) recordPartition.getPartition().get(TOPIC);
-                    Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
-                    if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
-                        log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
-                        return;
-                    }
-                    MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
-                    if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
-                        log.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
-                        return;
-                    }
-                    Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
-                    if (null == offset) {
-                        log.warn("resetOffset, offset is null");
-                        return;
-                    }
-                    messageQueuesOffsetMap.put(messageQueue, offset);
-                    try {
-                        consumer.updateConsumeOffset(messageQueue, offset);
-                    } catch (MQClientException e) {
-                        log.warn("updateConsumeOffset MQClientException, messageQueue {}, offset {}", JSON.toJSONString(messageQueue), offset, e);
-                    }
-                }
-
-                @Override
-                public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
-                    if (MapUtils.isEmpty(offsets)) {
-                        log.warn("resetOffset, offsets {} is null", offsets);
-                        return;
-                    }
-                    for (Map.Entry<RecordPartition, RecordOffset> entry : offsets.entrySet()) {
-                        if (null == entry || null == entry.getKey() || null == entry.getKey().getPartition() || null == entry.getValue() || null == entry.getValue().getOffset()) {
-                            log.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry);
-                            continue;
-                        }
-                        RecordPartition recordPartition = entry.getKey();
-                        String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
-                        String topic = (String) recordPartition.getPartition().get(TOPIC);
-                        Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
-                        if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
-                            log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
-                            continue;
-                        }
-                        MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
-                        if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
-                            log.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
-                            continue;
-                        }
-                        RecordOffset recordOffset = entry.getValue();
-                        Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
-                        if (null == offset) {
-                            log.warn("resetOffset, offset is null");
-                            continue;
-                        }
-                        messageQueuesOffsetMap.put(messageQueue, offset);
-                        try {
-                            consumer.updateConsumeOffset(messageQueue, offset);
-                        } catch (MQClientException e) {
-                            log.warn("updateConsumeOffset MQClientException, messageQueue {}, offset {}", JSON.toJSONString(messageQueue), entry.getValue(), e);
-                        }
-                    }
-                }
-
-                @Override
-                public void pause(List<RecordPartition> recordPartitions) {
-                    if (recordPartitions == null || recordPartitions.size() == 0) {
-                        log.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
-                        return;
-                    }
-                    for (RecordPartition recordPartition : recordPartitions) {
-                        if (null == recordPartition || null == recordPartition.getPartition()) {
-                            log.warn("recordPartition {} info is null", recordPartition);
-                            continue;
-                        }
-                        String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
-                        String topic = (String) recordPartition.getPartition().get(TOPIC);
-                        Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
-                        if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
-                            log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
-                            continue;
-                        }
-                        MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
-                        if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
-                            log.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
-                            continue;
-                        }
-                        messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
-                    }
-                }
-
-                @Override
-                public void resume(List<RecordPartition> recordPartitions) {
-                    if (recordPartitions == null || recordPartitions.size() == 0) {
-                        log.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
-                        return;
-                    }
-                    for (RecordPartition recordPartition : recordPartitions) {
-                        if (null == recordPartition || null == recordPartition.getPartition()) {
-                            log.warn("recordPartition {} info is null", recordPartition);
-                            continue;
-                        }
-                        String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
-                        String topic = (String) recordPartition.getPartition().get(TOPIC);
-                        Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
-                        if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
-                            log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
-                            continue;
-                        }
-                        MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
-                        if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
-                            log.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
-                            continue;
-                        }
-                        messageQueuesStateMap.remove(messageQueue);
-                    }
-                }
-
-                @Override public Set<RecordPartition> assignment() {
-                    return null;
-                }
-
-                @Override public String getConnectorName() {
-                    return taskConfig.getString("connectorName");
-                }
-
-                @Override public String getTaskName() {
-                    return taskConfig.getString("taskId");
-                }
-            });
+            this.sinkTaskContext = new WorkerSinkTaskContext(taskConfig, this, consumer);
+            sinkTask.start(sinkTaskContext);
             // we assume executed here means we are safe
             log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig));
             state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING);
 
             while (WorkerState.STARTED == workerState.get() && WorkerTaskState.RUNNING == state.get()) {
                 // this method can block up to 3 minutes long
-                pullMessageFromQueues();
+                try {
+                    preCommit(false);
+                    setQueueOffset();
+                    pullMessageFromQueues();
+                } catch (RetriableException e) {
+                    log.error("Sink task RetriableException exception", e);
+                } catch (InterruptedException e) {
+                    log.error("Sink task InterruptedException exception", e);
+                    throw e;
+                } catch (Throwable e) {
+                    state.set(WorkerTaskState.ERROR);
+                    log.error("sink task {}, pull message MQClientException, Error {} ", this, e.getMessage(), e);
+                }
             }
 
             sinkTask.stop();
@@ -380,39 +235,190 @@ public class WorkerSinkTask implements WorkerTask {
         }
     }
 
-    private void pullMessageFromQueues() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+    private void setQueueOffset() {
+        Map<MessageQueue, Long> messageQueueOffsetMap = this.sinkTaskContext.queuesOffsets();
+        if (org.apache.commons.collections4.MapUtils.isEmpty(messageQueueOffsetMap)) {
+            return;
+        }
+        for (Map.Entry<MessageQueue, Long> entry : messageQueueOffsetMap.entrySet()) {
+            if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
+                this.messageQueuesOffsetMap.put(entry.getKey(), entry.getValue());
+                try {
+                    consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
+                } catch (MQClientException e) {
+                    log.warn("updateConsumeOffset MQClientException, messageQueue {}, offset {}", JSON.toJSONString(entry.getKey()), entry.getValue(), e);
+                }
+            }
+        }
+        this.sinkTaskContext.cleanQueuesOffsets();
+    }
+
+    private void registTopics() {
+        Set<String> topics = SinkConnectorConfig.parseTopicList(taskConfig);
+        if (org.apache.commons.collections4.CollectionUtils.isEmpty(topics)) {
+            throw new ConnectException("sink connector topics config can be null, please check sink connector config info");
+        }
+        for (String topic : topics) {
+            consumer.registerMessageQueueListener(topic, new MessageQueueListener() {
+                @Override
+                public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+                    log.info("messageQueueChanged, old messageQueuesOffsetMap {}", JSON.toJSONString(messageQueuesOffsetMap));
+                    WorkerSinkTask.this.preCommit(true);
+                    messageQueuesOffsetMap.forEach((key, value) -> {
+                        if (key.getTopic().equals(topic)) {
+                            messageQueuesOffsetMap.remove(key, value);
+                        }
+                    });
+
+                    Set<RecordPartition> waitRemoveQueueMetaDatas = new HashSet<>();
+                    recordPartitions.forEach(key -> {
+                        if (key.getPartition().get("topic").equals(topic)) {
+                            waitRemoveQueueMetaDatas.add(key);
+                        }
+                    });
+                    recordPartitions.removeAll(waitRemoveQueueMetaDatas);
+                    for (MessageQueue messageQueue : mqDivided) {
+                        messageQueuesOffsetMap.put(messageQueue, consumeFromOffset(messageQueue, taskConfig));
+                        RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(messageQueue);
+                        recordPartitions.add(recordPartition);
+                    }
+                    log.info("messageQueueChanged, new messageQueuesOffsetMap {}", JSON.toJSONString(messageQueuesOffsetMap));
+                }
+            });
+        }
+    }
+
+    public long consumeFromOffset(MessageQueue messageQueue, ConnectKeyValue taskConfig) {
+        //-1 when started
+        long offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
+        if (offset < 0) {
+            //query from broker
+            offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
+        }
+
+        String consumeFromWhere = taskConfig.getString("consume-from-where");
+        if (StringUtils.isBlank(consumeFromWhere)) {
+            consumeFromWhere = "CONSUME_FROM_LAST_OFFSET";
+        }
+
+        if (offset < 0) {
+            for (int i = 0; i < 3; i++) {
+                try {
+                    if (consumeFromWhere.equals("CONSUME_FROM_FIRST_OFFSET")) {
+                        offset = consumer.minOffset(messageQueue);
+                    } else {
+                        offset = consumer.maxOffset(messageQueue);
+                    }
+                    break;
+                } catch (MQClientException e) {
+                    log.error("get max offset MQClientException", e);
+                    if (i == 3) {
+                        throw new ConnectException("get max offset MQClientException", e);
+                    }
+                    continue;
+                }
+            }
+        }
+        //make sure
+        if (offset < 0) {
+            offset = 0;
+        }
+        return offset;
+    }
+
+    public void incPullTPS(String topic, int pullSize) {
+        consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+            .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
+    }
+
+    private void pullMessageFromQueues() throws InterruptedException {
         long startTimeStamp = System.currentTimeMillis();
         log.info("START pullMessageFromQueues, time started : {}", startTimeStamp);
+        if (org.apache.commons.collections4.MapUtils.isEmpty(messageQueuesOffsetMap)) {
+            log.info("messageQueuesOffsetMap is null, : {}", startTimeStamp);
+            stopPullMsgLatch.await(PULL_MSG_ERROR_BACKOFF_MS, TimeUnit.MILLISECONDS);
+        }
         for (Map.Entry<MessageQueue, Long> entry : messageQueuesOffsetMap.entrySet()) {
             if (messageQueuesStateMap.containsKey(entry.getKey())) {
+                log.warn("sink task message queue state is not running, sink task id {}, queue info {}, queue state {}", taskConfig.getString(RuntimeConfigDefine.TASK_ID), JSON.toJSONString(entry.getKey()), JSON.toJSONString(messageQueuesStateMap.get(entry.getKey())));
                 continue;
             }
             log.info("START pullBlockIfNotFound, time started : {}", System.currentTimeMillis());
 
             if (WorkerTaskState.RUNNING != state.get()) {
+                log.warn("sink task state is not running, sink task id {}, state {}", taskConfig.getString(RuntimeConfigDefine.TASK_ID), state.get().name());
                 break;
             }
-            final PullResult pullResult = consumer.pullBlockIfNotFound(entry.getKey(), "*", entry.getValue(), MAX_MESSAGE_NUM);
+            PullResult pullResult = null;
+            try {
+                shouldStopPullMsg();
+                pullResult = consumer.pullBlockIfNotFound(entry.getKey(), "*", entry.getValue(), MAX_MESSAGE_NUM);
+                pullMsgErrorCount = 0;
+            } catch (MQClientException e) {
+                pullMsgErrorCount++;
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+            } catch (RemotingException e) {
+                pullMsgErrorCount++;
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+            } catch (MQBrokerException e) {
+                pullMsgErrorCount++;
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+            } catch (InterruptedException e) {
+                pullMsgErrorCount++;
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                throw e;
+            } catch (Throwable e) {
+                pullMsgErrorCount++;
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
+                throw e;
+            }
             long currentTime = System.currentTimeMillis();
 
+            List<MessageExt> messages = null;
             log.info("INSIDE pullMessageFromQueues, time elapsed : {}", currentTime - startTimeStamp);
-            if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
-                final List<MessageExt> messages = pullResult.getMsgFoundList();
+            if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.FOUND)) {
+                this.incPullTPS(entry.getKey().getTopic(), pullResult.getMsgFoundList().size());
+                messages = pullResult.getMsgFoundList();
                 receiveMessages(messages);
-                messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
-                offsetManagementService.putPosition(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
-                preCommit();
+                if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
+                    messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
+                } else {
+                    log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
+                }
+                try {
+                    consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset());
+                } catch (MQClientException e) {
+                    log.warn("updateConsumeOffset MQClientException, pullResult {}", pullResult, e);
+                }
+            } else if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.OFFSET_ILLEGAL)) {
+                log.warn("offset illegal, reset offset, message queue {}, pull offset {}, nextBeginOffset {}", JSON.toJSONString(entry.getKey()), entry.getValue(), pullResult.getNextBeginOffset());
+                this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
+            } else if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.NO_NEW_MSG)) {
+                log.info("no new message, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+            } else if (null != pullResult && pullResult.getPullStatus().equals(PullStatus.NO_MATCHED_MSG)) {
+                log.info("no matched msg, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
+                this.sinkTaskContext.resetOffset(ConnectUtil.convertToRecordPartition(entry.getKey()), ConnectUtil.convertToRecordOffset(pullResult.getNextBeginOffset()));
+            } else {
+                log.info("no new message, pullResult {}, message queue {}, pull offset {}", JSON.toJSONString(pullResult), JSON.toJSONString(entry.getKey()), entry.getValue());
             }
         }
     }
 
-    private void preCommit() {
+    private void shouldStopPullMsg() throws InterruptedException {
+        if (pullMsgErrorCount == PULL_MSG_ERROR_THRESHOLD) {
+            log.error("Accumulative error {} times, stop pull msg for {} ms", pullMsgErrorCount, PULL_MSG_ERROR_BACKOFF_MS);
+            stopPullMsgLatch.await(PULL_MSG_ERROR_BACKOFF_MS, TimeUnit.MILLISECONDS);
+            pullMsgErrorCount = 0;
+        }
+    }
+
+    private void preCommit(boolean isForce) {
         long commitInterval = taskConfig.getLong(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 1000);
         if (nextCommitTime <= 0) {
             long now = System.currentTimeMillis();
             nextCommitTime = now + commitInterval;
         }
-        if (nextCommitTime < System.currentTimeMillis()) {
+        if (isForce || nextCommitTime < System.currentTimeMillis()) {
             Map<RecordPartition, RecordOffset> queueMetaDataLongMap = new HashMap<>(512);
             if (messageQueuesOffsetMap.size() > 0) {
                 for (Map.Entry<MessageQueue, Long> messageQueueLongEntry : messageQueuesOffsetMap.entrySet()) {
@@ -438,7 +444,6 @@ public class WorkerSinkTask implements WorkerTask {
         }
     }
 
-
     @Override
     public void stop() {
         state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
@@ -538,7 +543,6 @@ public class WorkerSinkTask implements WorkerTask {
         return taskConfig;
     }
 
-
     /**
      * Further we cant try to log what caused the error
      */
@@ -546,6 +550,7 @@ public class WorkerSinkTask implements WorkerTask {
     public void timeout() {
         this.state.set(WorkerTaskState.ERROR);
     }
+
     @Override
     public String toString() {
 
@@ -565,26 +570,29 @@ public class WorkerSinkTask implements WorkerTask {
         return obj;
     }
 
-    private enum QueueState {
-        PAUSE
+    public Set<RecordPartition> getRecordPartitions() {
+        return recordPartitions;
     }
 
-    private ByteBuffer convertToByteBufferKey(MessageQueue messageQueue) {
-        return ByteBuffer.wrap((messageQueue.getTopic() + COMMA + messageQueue.getBrokerName() + COMMA + messageQueue.getQueueId()).getBytes());
-    }
-
-    private MessageQueue convertToMessageQueue(ByteBuffer byteBuffer) {
-        byte[] array = byteBuffer.array();
-        String s = String.valueOf(array);
-        String[] split = s.split(COMMA);
-        return new MessageQueue(split[0], split[1], Integer.valueOf(split[2]));
+    /**
+     * Reset the consumer offset for the given queue.
+     *
+     * @param recordPartition the queue to reset offset.
+     * @param recordOffset    the offset to reset to.
+     */
+    public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+        this.sinkTaskContext.resetOffset(recordPartition, recordOffset);
     }
 
-    private ByteBuffer convertToByteBufferValue(Long offset) {
-        return ByteBuffer.wrap(String.valueOf(offset).getBytes());
+    /**
+     * Reset the consumer offsets for the given queue.
+     *
+     * @param offsets the map of offsets for queuename.
+     */
+    public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
+        this.sinkTaskContext.resetOffset(offsets);
     }
 
-    private Long convertToOffset(ByteBuffer byteBuffer) {
-        return Long.valueOf(new String(byteBuffer.array()));
-    }
 }
+
+
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
new file mode 100644
index 0000000..f2f67a0
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.common.QueueState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.BROKER_NAME;
+import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_ID;
+import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
+import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.TOPIC;
+
+public class WorkerSinkTaskContext implements SinkTaskContext {
+
+    /**
+     * The configs of current sink task.
+     */
+    private final ConnectKeyValue taskConfig;
+
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+    private final Map<MessageQueue, Long> messageQueuesOffsetMap = new ConcurrentHashMap<>(64);
+
+    private final Map<MessageQueue, QueueState> messageQueuesStateMap = new ConcurrentHashMap<>(64);
+
+    private final WorkerSinkTask workerSinkTask;
+
+    private final DefaultMQPullConsumer consumer;
+
+    public WorkerSinkTaskContext(ConnectKeyValue taskConfig, WorkerSinkTask workerSinkTask, DefaultMQPullConsumer consumer) {
+        this.taskConfig = taskConfig;
+        this.workerSinkTask = workerSinkTask;
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+        if (null == recordPartition || null == recordPartition.getPartition() || null == recordOffset || null == recordOffset.getOffset()) {
+            log.warn("recordPartition {} info is null or recordOffset {} info is null", recordPartition, recordOffset);
+            return;
+        }
+        String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
+        String topic = (String) recordPartition.getPartition().get(TOPIC);
+        Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
+        if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
+            log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+            return;
+        }
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
+        Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
+        if (null == offset) {
+            log.warn("resetOffset, offset is null");
+            return;
+        }
+        messageQueuesOffsetMap.put(messageQueue, offset);
+    }
+
+    @Override
+    public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
+        if (MapUtils.isEmpty(offsets)) {
+            log.warn("resetOffset, offsets {} is null", offsets);
+            return;
+        }
+        for (Map.Entry<RecordPartition, RecordOffset> entry : offsets.entrySet()) {
+            if (null == entry || null == entry.getKey() || null == entry.getKey().getPartition() || null == entry.getValue() || null == entry.getValue().getOffset()) {
+                log.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry);
+                continue;
+            }
+            RecordPartition recordPartition = entry.getKey();
+            String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
+            String topic = (String) recordPartition.getPartition().get(TOPIC);
+            Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
+            if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
+                log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+                continue;
+            }
+            MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
+            RecordOffset recordOffset = entry.getValue();
+            Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
+            if (null == offset) {
+                log.warn("resetOffset, offset is null");
+                continue;
+            }
+            messageQueuesOffsetMap.put(messageQueue, offset);
+        }
+    }
+
+    @Override
+    public void pause(List<RecordPartition> recordPartitions) {
+        if (recordPartitions == null || recordPartitions.size() == 0) {
+            log.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+            return;
+        }
+        for (RecordPartition recordPartition : recordPartitions) {
+            if (null == recordPartition || null == recordPartition.getPartition()) {
+                log.warn("recordPartition {} info is null", recordPartition);
+                continue;
+            }
+            String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
+            String topic = (String) recordPartition.getPartition().get(TOPIC);
+            Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
+            if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
+                log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+                continue;
+            }
+            MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
+            if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
+                log.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+                continue;
+            }
+            messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
+        }
+        this.workerSinkTask.stop();
+    }
+
+    @Override
+    public void resume(List<RecordPartition> recordPartitions) {
+        if (recordPartitions == null || recordPartitions.size() == 0) {
+            log.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+            return;
+        }
+        for (RecordPartition recordPartition : recordPartitions) {
+            if (null == recordPartition || null == recordPartition.getPartition()) {
+                log.warn("recordPartition {} info is null", recordPartition);
+                continue;
+            }
+            String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
+            String topic = (String) recordPartition.getPartition().get(TOPIC);
+            Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
+            if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
+                log.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+                continue;
+            }
+            MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
+            if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
+                log.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+                continue;
+            }
+            messageQueuesStateMap.remove(messageQueue);
+        }
+    }
+
+    @Override public Set<RecordPartition> assignment() {
+        return this.workerSinkTask.getRecordPartitions();
+    }
+
+    @Override public String getConnectorName() {
+        return taskConfig.getString("connectorName");
+    }
+
+    @Override public String getTaskName() {
+        return taskConfig.getString("taskId");
+    }
+
+    public Map<MessageQueue, Long> queuesOffsets() {
+        return this.messageQueuesOffsetMap;
+    }
+
+    public void cleanQueuesOffsets() {
+        this.messageQueuesOffsetMap.clear();
+    }
+
+
+}