You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:29 UTC

[rocketmq-connect] 19/39: feat(replicator):add consumer offset sync

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit a3217626bb4c35b5fc71e4a5b704601355a48c06
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Thu Oct 3 23:57:38 2019 +0800

    feat(replicator):add consumer offset sync
    
    - sync src cluster consumer groups offset to message store
    - Add MetaSourceConnector as sourceConnector for task assign
    - Add MetaSourceTask as SourceTask for offset read and message sink
    - Add java 1.8 check
    
    Closes #427
---
 README.md                                          |  12 ++
 pom.xml                                            |   7 +-
 .../apache/rocketmq/replicator/MetaSourceTask.java | 163 +++++++++++++++++
 .../apache/rocketmq/replicator/RmqConstants.java   |   4 +
 .../rocketmq/replicator/RmqMetaReplicator.java     | 196 +++++++++++++++++++++
 .../rocketmq/replicator/RmqSourceReplicator.java   |  79 +++------
 .../apache/rocketmq/replicator/RmqSourceTask.java  |  40 +++--
 .../apache/rocketmq/replicator/common/Utils.java   |  50 +++++-
 .../rocketmq/replicator/config/ConfigDefine.java   |   4 +
 .../rocketmq/replicator/config/DataType.java       |   3 +-
 .../replicator/config/RmqConnectorConfig.java      | 130 ++++++++++++++
 .../rocketmq/replicator/config/TaskConfig.java     |  29 +++
 .../rocketmq/replicator/config/TaskConfigEnum.java |   3 +
 .../replicator/config/TaskDivideConfig.java        |   5 +-
 .../rocketmq/replicator/offset/OffsetSync.java     |  65 +++++++
 .../replicator/offset/OffsetSyncStore.java         |  90 ++++++++++
 .../rocketmq/replicator/schema/FieldName.java      |   3 +-
 .../replicator/RmqSourceReplicatorTest.java        |   7 +-
 18 files changed, 809 insertions(+), 81 deletions(-)

diff --git a/README.md b/README.md
index 332db6e..538568b 100644
--- a/README.md
+++ b/README.md
@@ -16,6 +16,18 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}/stop
 ````
 
+## rocketmq-meta-connector启动
+````
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}
+?config={"connector-class":"org.apache.rocketmq.replicator.RmqMetaReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","offset.sync.topic":"syncTopic","taskDivideStrategy":"0","white-list":"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+````
+
+
+## rocketmq-rocketmq-connector停止
+````
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}/stop
+````
+
 ## rocketmq-replicator参数说明
 
 parameter | type | must | description | sample value
diff --git a/pom.xml b/pom.xml
index 51a6177..dee5b08 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,8 +30,8 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <configuration>
-                    <source>6</source>
-                    <target>6</target>
+                    <source>8</source>
+                    <target>8</target>
                 </configuration>
             </plugin>
         </plugins>
@@ -107,7 +107,7 @@
             <id>release-all</id>
             <build>
                 <plugins>
-                    <plugin>
+                  <plugin>
                         <artifactId>maven-assembly-plugin</artifactId>
                         <configuration>
                             <descriptors>
@@ -128,6 +128,7 @@
                 <finalName>rocketmq-replicator-${project.version}</finalName>
             </build>
         </profile>
+
     </profiles>
 
 </project>
diff --git a/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
new file mode 100644
index 0000000..16bf464
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.replicator;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.replicator.common.ConstDefine;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigUtil;
+import org.apache.rocketmq.replicator.config.TaskConfig;
+import org.apache.rocketmq.replicator.offset.OffsetSyncStore;
+import org.apache.rocketmq.replicator.schema.FieldName;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetaSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(RmqSourceTask.class);
+
+    private final String taskId;
+    private final TaskConfig config;
+    private DefaultMQAdminExt srcMQAdminExt;
+    private volatile boolean started = false;
+
+    private OffsetSyncStore store;
+
+    public MetaSourceTask() {
+        this.config = new TaskConfig();
+        this.taskId = Utils.createTaskId(Thread.currentThread().getName());
+    }
+
+    @Override public void start(KeyValue config) {
+        ConfigUtil.load(config, this.config);
+
+        startAdmin();
+
+        this.store = new OffsetSyncStore(this.srcMQAdminExt, this.config);
+        this.started = true;
+    }
+
+    @Override public void stop() {
+        if (started) {
+            started = false;
+        }
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+
+    @Override public Collection<SourceDataEntry> poll() {
+        log.debug("polling...");
+        List<String> groups = JSONObject.parseArray(this.config.getTaskGroupList(), String.class);
+
+        if (groups == null) {
+            log.info("no group in task.");
+            try {
+                Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+            } catch (InterruptedException e) {
+                throw new IllegalStateException(e);
+            }
+            return Collections.emptyList();
+        }
+        List<SourceDataEntry> res = new ArrayList<>();
+        for (String group : groups) {
+            ConsumeStats stats;
+            try {
+                stats = this.srcMQAdminExt.examineConsumeStats(group);
+            } catch (Exception e) {
+                log.error("admin get consumer info failed for consumer groups: " + group, e);
+                continue;
+            }
+
+            for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable : stats.getOffsetTable().entrySet()) {
+
+                MessageQueue mq = offsetTable.getKey();
+                long srcOffset = offsetTable.getValue().getConsumerOffset();
+                long targetOffset = this.store.convertTargetOffset(mq, srcOffset);
+
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put(RmqConstants.NEXT_POSITION, srcOffset);
+
+                Schema schema = new Schema();
+                schema.setDataSource(this.config.getSourceRocketmq());
+                schema.setName(mq.getTopic());
+                schema.setFields(new ArrayList<>());
+                schema.getFields().add(new Field(0,
+                    FieldName.OFFSET.getKey(), FieldType.INT64));
+
+                DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+                dataEntryBuilder.timestamp(System.currentTimeMillis())
+                    .queue(this.config.getStoreTopic())
+                    .entryType(EntryType.UPDATE);
+                dataEntryBuilder.putFiled(FieldName.OFFSET.getKey(), targetOffset);
+
+                SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                    ByteBuffer.wrap(RmqConstants.getPartition(
+                        mq.getTopic(),
+                        mq.getBrokerName(),
+                        String.valueOf(mq.getQueueId())).getBytes(StandardCharsets.UTF_8)),
+                    ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
+                );
+                String targetTopic = new StringBuilder().append(group).append("-").append(mq.getTopic())
+                    .append("-").append(mq.getQueueId()).toString();
+                sourceDataEntry.setQueueName(targetTopic);
+                res.add(sourceDataEntry);
+            }
+        }
+        return res;
+    }
+
+    private void startAdmin() {
+        this.srcMQAdminExt = new DefaultMQAdminExt();
+        this.srcMQAdminExt.setNamesrvAddr(this.config.getSourceRocketmq());
+        this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.config.getSourceRocketmq()));
+        try {
+            this.srcMQAdminExt.start();
+        } catch (MQClientException e) {
+            log.error("start src mq admin failed.", e);
+            throw new IllegalStateException("start src mq admin failed");
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java b/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
index 4994abe..290ab1c 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
@@ -31,4 +31,8 @@ public class RmqConstants {
     public static String getPartition(String topic, String broker, String queueId) {
         return new StringBuilder().append(broker).append(topic).append(queueId).toString();
     }
+
+    public static String getOffsetTag(String topic, String broker, String queueId, String group) {
+        return new StringBuilder().append(broker).append(topic).append(queueId).append(group).toString();
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
new file mode 100644
index 0000000..38e5af2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.replicator.common.ConstDefine;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RmqMetaReplicator extends SourceConnector {
+    private static final Logger log = LoggerFactory.getLogger(RmqSourceReplicator.class);
+
+    private static final Set<String> INNER_CONSUMER_GROUPS = new HashSet<>();
+
+    private RmqConnectorConfig replicatorConfig;
+
+    private volatile boolean configValid = false;
+    private Set<String> knownGroups;
+    private DefaultMQAdminExt srcMQAdminExt;
+    private volatile boolean adminStarted;
+    private ScheduledExecutorService executor;
+
+    static {
+        INNER_CONSUMER_GROUPS.add("TOOLS_CONSUMER");
+        INNER_CONSUMER_GROUPS.add("FILTERSRV_CONSUMER");
+        INNER_CONSUMER_GROUPS.add("__MONITOR_CONSUMER");
+        INNER_CONSUMER_GROUPS.add("CLIENT_INNER_PRODUCER");
+        INNER_CONSUMER_GROUPS.add("SELF_TEST_P_GROUP");
+        INNER_CONSUMER_GROUPS.add("SELF_TEST_C_GROUP");
+        INNER_CONSUMER_GROUPS.add("SELF_TEST_TOPIC");
+        INNER_CONSUMER_GROUPS.add("OFFSET_MOVED_EVENT");
+        INNER_CONSUMER_GROUPS.add("CID_ONS-HTTP-PROXY");
+        INNER_CONSUMER_GROUPS.add("CID_ONSAPI_PERMISSION");
+        INNER_CONSUMER_GROUPS.add("CID_ONSAPI_OWNER");
+        INNER_CONSUMER_GROUPS.add("CID_ONSAPI_PULL");
+    }
+
+    public RmqMetaReplicator() {
+        replicatorConfig = new RmqConnectorConfig();
+        knownGroups = new HashSet<>();
+        executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
+    }
+
+    @Override public String verifyAndSetConfig(KeyValue config) {
+        log.info("verifyAndSetConfig...");
+        try {
+            replicatorConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
+        }
+
+        this.configValid = true;
+        return "";
+    }
+
+    @Override public void start() {
+        log.info("starting...");
+        startMQAdminTools();
+        executor.scheduleAtFixedRate(() ->
+        {
+            try {
+                refreshConsuemrGroups();
+            } catch (Exception e) {
+                log.error("refresh consumer groups failed.", e);
+            }
+        }, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
+    }
+
+    @Override public void stop() {
+        log.info("stopping...");
+        this.executor.shutdown();
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+
+    @Override public Class<? extends Task> taskClass() {
+        return MetaSourceTask.class;
+    }
+
+    @Override public List<KeyValue> taskConfigs() {
+        log.debug("preparing taskConfig...");
+        if (!configValid) {
+            return new ArrayList<>();
+        }
+
+        startMQAdminTools();
+
+        try {
+            this.knownGroups = this.fetchConsumerGroups();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        return Utils.groupPartitions(new ArrayList<>(this.knownGroups), this.replicatorConfig.getTaskParallelism(), replicatorConfig);
+    }
+
+    private synchronized void startMQAdminTools() {
+        if (!configValid || adminStarted) {
+            return;
+        }
+        RPCHook rpcHook = null;
+        this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getSrcNamesrvs());
+        this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getSrcNamesrvs()));
+
+        try {
+            this.srcMQAdminExt.start();
+            log.info("RocketMQ srcMQAdminExt started");
+        } catch (MQClientException e) {
+            log.error("Replicator start failed for `srcMQAdminExt` exception.", e);
+        }
+        adminStarted = true;
+    }
+
+    private void refreshConsuemrGroups() throws InterruptedException, RemotingConnectException, MQBrokerException, RemotingTimeoutException, MQClientException, RemotingSendRequestException {
+        log.debug("refreshConsuemrGroups...");
+        Set<String> groups = fetchConsumerGroups();
+        Set<String> newGroups = new HashSet<>();
+        Set<String> deadGroups = new HashSet<>();
+        newGroups.addAll(groups);
+        newGroups.removeAll(knownGroups);
+        deadGroups.addAll(knownGroups);
+        deadGroups.removeAll(groups);
+        if (!newGroups.isEmpty() || !deadGroups.isEmpty()) {
+            log.info("reconfig consumer groups, new Groups: {} , dead groups: {}, previous groups: {}", newGroups, deadGroups, knownGroups);
+            knownGroups = groups;
+            context.requestTaskReconfiguration();
+        }
+    }
+
+    private Set<String> fetchConsumerGroups() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return listGroups().stream().filter(this::skipInnerGroup).collect(Collectors.toSet());
+    }
+
+    private Set<String> listGroups() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        Set<String> groups = new HashSet<>();
+        ClusterInfo clusterInfo = this.srcMQAdminExt.examineBrokerClusterInfo();
+        String[] addrs = clusterInfo.retrieveAllAddrByCluster(this.replicatorConfig.getSrcCluster());
+        for (String addr : addrs) {
+            ConsumeStatsList stats = this.srcMQAdminExt.fetchConsumeStatsInBroker(addr, true, 3 * 1000);
+            stats.getConsumeStatsList().stream().map(kv -> kv.keySet()).forEach(groups::addAll);
+        }
+        return groups;
+    }
+
+    private boolean skipInnerGroup(String group) {
+        if (INNER_CONSUMER_GROUPS.contains(group) || group.startsWith("CID_RMQ_SYS_") || group.startsWith("PositionManage") ||
+            group.startsWith("ConfigManage") || group.startsWith("OffsetManage")) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 429102e..c560840 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -47,12 +46,9 @@ import org.apache.rocketmq.replicator.common.ConstDefine;
 import org.apache.rocketmq.replicator.common.Utils;
 import org.apache.rocketmq.replicator.config.ConfigDefine;
 import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
 import org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
-import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
-import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
-import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
-import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,18 +61,12 @@ public class RmqSourceReplicator extends SourceConnector {
 
     private boolean syncRETRY = false;
 
-    private KeyValue replicatorConfig;
+    private RmqConnectorConfig replicatorConfig;
 
     private Map<String, List<TaskTopicInfo>> topicRouteMap;
 
-    private TaskDivideStrategy taskDivideStrategy;
-
-    private Set<String> whiteList;
-
     private volatile boolean configValid = false;
 
-    private int taskParallelism = 1;
-
     private DefaultMQAdminExt srcMQAdminExt;
     private DefaultMQAdminExt targetMQAdminExt;
 
@@ -86,7 +76,7 @@ public class RmqSourceReplicator extends SourceConnector {
 
     public RmqSourceReplicator() {
         topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
-        whiteList = new HashSet<String>();
+        replicatorConfig = new RmqConnectorConfig();
         executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqSourceReplicator-SourceWatcher-%d").daemon(true).build());
     }
 
@@ -96,14 +86,14 @@ public class RmqSourceReplicator extends SourceConnector {
         }
         RPCHook rpcHook = null;
         this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
-        this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
+        this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getSrcNamesrvs());
         this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
-        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getSrcNamesrvs()));
 
         this.targetMQAdminExt = new DefaultMQAdminExt(rpcHook);
-        this.targetMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_RMQ));
+        this.targetMQAdminExt.setNamesrvAddr(this.replicatorConfig.getTargetNamesrvs());
         this.targetMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
-        this.targetMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_RMQ)));
+        this.targetMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getTargetNamesrvs()));
 
         try {
             this.srcMQAdminExt.start();
@@ -128,29 +118,11 @@ public class RmqSourceReplicator extends SourceConnector {
             }
         }
 
-        // Check the whitelist, whitelist is required.
-        String whileListStr = config.getString(ConfigDefine.CONN_WHITE_LIST);
-        String[] wl = whileListStr.trim().split(",");
-        if (wl.length <= 0)
-            return "White list must be not empty.";
-        else {
-            for (String t : wl) {
-                this.whiteList.add(t.trim());
-            }
-        }
-
-        if (config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
-            config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
-            this.taskDivideStrategy = new DivideTaskByQueue();
-        } else {
-            this.taskDivideStrategy = new DivideTaskByTopic();
-        }
-
-        if (config.containsKey(ConfigDefine.CONN_TASK_PARALLELISM)) {
-            this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM);
+        try {
+            this.replicatorConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
         }
-
-        this.replicatorConfig = config;
         this.configValid = true;
         return "";
     }
@@ -173,7 +145,7 @@ public class RmqSourceReplicator extends SourceConnector {
                     context.requestTaskReconfiguration();
                 }
             }
-        }, 30, 30, TimeUnit.SECONDS);
+        }, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
     }
 
     public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
@@ -225,21 +197,22 @@ public class RmqSourceReplicator extends SourceConnector {
         buildRoute();
 
         TaskDivideConfig tdc = new TaskDivideConfig(
-            this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
-            this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
-            this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+            this.replicatorConfig.getSrcNamesrvs(),
+            this.replicatorConfig.getSrcCluster(),
+            this.replicatorConfig.getStoreTopic(),
+            this.replicatorConfig.getConverter(),
             DataType.COMMON_MESSAGE.ordinal(),
-            this.taskParallelism
+            this.replicatorConfig.getTaskParallelism()
         );
-        return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
+        return this.replicatorConfig.getTaskDivideStrategy().divide(this.topicRouteMap, tdc);
     }
 
     public void buildRoute() {
         List<Pattern> patterns = new ArrayList<Pattern>();
-        String srcCluster = this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
+        String srcCluster = this.replicatorConfig.getSrcCluster();
         try {
             Set<String> targetTopicSet = fetchTargetTopics();
-            for (String topic : this.whiteList) {
+            for (String topic : this.replicatorConfig.getWhiteList()) {
                 Pattern pattern = Pattern.compile(topic);
                 patterns.add(pattern);
             }
@@ -290,16 +263,12 @@ public class RmqSourceReplicator extends SourceConnector {
         }
     }
 
-    public void setWhiteList(Set<String> whiteList) {
-        this.whiteList = whiteList;
-    }
-
     public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
         return this.topicRouteMap;
     }
 
     public Set<String> fetchTargetTopics() throws RemotingException, MQClientException, InterruptedException {
-        String targetCluster = this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+        String targetCluster = this.replicatorConfig.getTargetCluster();
         TopicList targetTopics = this.targetMQAdminExt.fetchTopicsByCLuster(targetCluster);
         return targetTopics.getTopicList();
     }
@@ -317,8 +286,8 @@ public class RmqSourceReplicator extends SourceConnector {
      */
     public void ensureTargetTopic(String srcTopic,
         String targetTopic) throws RemotingException, MQClientException, InterruptedException {
-        String srcCluster = this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
-        String targetCluster = this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+        String srcCluster = this.replicatorConfig.getSrcCluster();
+        String targetCluster = this.replicatorConfig.getTargetCluster();
 
         List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, srcTopic, srcCluster);
         if (brokerList.size() == 0) {
@@ -334,7 +303,7 @@ public class RmqSourceReplicator extends SourceConnector {
     }
 
     public String generateTargetTopic(String topic) {
-        String fmt = this.replicatorConfig.getString(ConfigDefine.CONN_TOPIC_RENAME_FMT);
+        String fmt = this.replicatorConfig.getRenamePattern();
         if (StringUtils.isNotEmpty(fmt)) {
             Map<String, String> params = new HashMap<String, String>();
             params.put("topic", topic);
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 42dbab8..d965898 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -19,8 +19,20 @@ package org.apache.rocketmq.replicator;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -31,11 +43,11 @@ import org.apache.rocketmq.replicator.config.DataType;
 import org.apache.rocketmq.replicator.config.TaskConfig;
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 import org.apache.rocketmq.replicator.schema.FieldName;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.*;
 
 public class RmqSourceTask extends SourceTask {
 
@@ -52,7 +64,7 @@ public class RmqSourceTask extends SourceTask {
         this.config = new TaskConfig();
         this.consumer = new DefaultMQPullConsumer();
         this.taskId = Utils.createTaskId(Thread.currentThread().getName());
-        mqOffsetMap = new HashMap<TaskTopicInfo, Long>();
+        mqOffsetMap = new HashMap<>();
     }
 
     public Collection<SourceDataEntry> poll() {
@@ -84,15 +96,15 @@ public class RmqSourceTask extends SourceTask {
             for (TaskTopicInfo tti : topicList) {
                 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getTopic());
                 for (MessageQueue mq : mqs) {
-                    if (Integer.valueOf(tti.getQueueId()) == mq.getQueueId()) {
+                    if (tti.getQueueId() == mq.getQueueId()) {
                         ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
                             ByteBuffer.wrap(RmqConstants.getPartition(
                                 mq.getTopic(),
                                 mq.getBrokerName(),
-                                String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
+                                String.valueOf(mq.getQueueId())).getBytes(StandardCharsets.UTF_8)));
 
                         if (null != positionInfo && positionInfo.array().length > 0) {
-                            String positionJson = new String(positionInfo.array(), "UTF-8");
+                            String positionJson = new String(positionInfo.array(), StandardCharsets.UTF_8);
                             JSONObject jsonObject = JSONObject.parseObject(positionJson);
                             this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
                         } else {
@@ -129,7 +141,7 @@ public class RmqSourceTask extends SourceTask {
 
     private Collection<SourceDataEntry> pollCommonMessage() {
 
-        List<SourceDataEntry> res = new ArrayList<SourceDataEntry>();
+        List<SourceDataEntry> res = new ArrayList<>();
         if (started) {
             try {
                 for (TaskTopicInfo taskTopicConfig : this.mqOffsetMap.keySet()) {
@@ -144,7 +156,7 @@ public class RmqSourceTask extends SourceTask {
                             Schema schema = new Schema();
                             schema.setDataSource(this.config.getSourceRocketmq());
                             schema.setName(taskTopicConfig.getTopic());
-                            schema.setFields(new ArrayList<Field>());
+                            schema.setFields(new ArrayList<>());
                             schema.getFields().add(new Field(0,
                                 FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
 
@@ -152,12 +164,13 @@ public class RmqSourceTask extends SourceTask {
                             dataEntryBuilder.timestamp(System.currentTimeMillis())
                                 .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
                             dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(msgs));
+
                             SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
                                 ByteBuffer.wrap(RmqConstants.getPartition(
                                     taskTopicConfig.getTopic(),
                                     taskTopicConfig.getBrokerName(),
-                                    String.valueOf(taskTopicConfig.getQueueId())).getBytes("UTF-8")),
-                                ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
+                                    String.valueOf(taskTopicConfig.getQueueId())).getBytes(StandardCharsets.UTF_8)),
+                                ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
                             );
                             sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
                             res.add(sourceDataEntry);
@@ -179,15 +192,16 @@ public class RmqSourceTask extends SourceTask {
     }
 
     private Collection<SourceDataEntry> pollTopicConfig() {
-        return new ArrayList<SourceDataEntry>();
+        DefaultMQAdminExt srcMQAdminExt;
+        return new ArrayList<>();
     }
 
     private Collection<SourceDataEntry> pollBrokerConfig() {
-        return new ArrayList<SourceDataEntry>();
+        return new ArrayList<>();
     }
 
     private Collection<SourceDataEntry> pollSubConfig() {
-        return new ArrayList<SourceDataEntry>();
+        return new ArrayList<>();
     }
 }
 
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index f3e6be6..4134a2a 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -16,6 +16,9 @@
  */
 package org.apache.rocketmq.replicator.common;
 
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -26,6 +29,9 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
+import org.apache.rocketmq.replicator.config.TaskConfigEnum;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
 import org.slf4j.Logger;
@@ -48,7 +54,7 @@ public class Utils {
 
     public static String createInstanceName(String namesrvAddr) {
         String[] namesrvArray = namesrvAddr.split(";");
-        List<String> namesrvList = new ArrayList<String>();
+        List<String> namesrvList = new ArrayList<>();
         for (String ns : namesrvArray) {
             if (!namesrvList.contains(ns)) {
                 namesrvList.add(ns);
@@ -60,7 +66,7 @@ public class Utils {
 
     public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
         String cluster) throws RemotingException, MQClientException, InterruptedException {
-        List<BrokerData> brokerList = new ArrayList<BrokerData>();
+        List<BrokerData> brokerList = new ArrayList<>();
 
         TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
         if (topicRouteData.getBrokerDatas() != null) {
@@ -79,7 +85,7 @@ public class Utils {
                 CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
             for (String addr : masterSet) {
                 defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
-                log.info("create topic to %s success.%n", addr);
+                log.info("create topic to {} success.%n", addr);
             }
 
             if (topicConfig.isOrder()) {
@@ -94,12 +100,44 @@ public class Utils {
                 }
                 defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
                     orderConf.toString(), true);
-                log.info("set cluster orderConf=[%s]", orderConf);
+                log.info("set cluster orderConf=[{}]", orderConf);
             }
-
-            return;
         } catch (Exception e) {
             throw new IllegalArgumentException("create topic: " + topicConfig + "failed", e);
         }
     }
+
+    public static List<KeyValue> groupPartitions(List<String> elements, int numGroups, RmqConnectorConfig tdc) {
+        if (numGroups <= 0)
+            throw new IllegalArgumentException("Number of groups must be positive.");
+
+        List<KeyValue> result = new ArrayList<>(numGroups);
+
+        // Each group has either n+1 or n raw partitions
+        int perGroup = elements.size() / numGroups;
+        int leftover = elements.size() - (numGroups * perGroup);
+
+        int assigned = 0;
+        for (int group = 0; group < numGroups; group++) {
+            int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
+            KeyValue keyValue = new DefaultKeyValue();
+            List<String> groupList = new ArrayList<>();
+            for (int i = 0; i < numThisGroup; i++) {
+                groupList.add(elements.get(assigned));
+                assigned++;
+            }
+            keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSrcNamesrvs());
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_CLUSTER.getKey(), tdc.getSrcCluster());
+            keyValue.put(TaskConfigEnum.TASK_OFFSET_SYNC_TOPIC.getKey(), tdc.getSrcCluster());
+            keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.OFFSET.ordinal());
+            keyValue.put(TaskConfigEnum.TASK_GROUP_INFO.getKey(), JSONObject.toJSONString(groupList));
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getConverter());
+            result.add(keyValue);
+
+            log.debug("allocate group partition: {}", keyValue);
+        }
+
+        return result;
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index 5b15821..a3514ee 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -43,6 +43,10 @@ public class ConfigDefine {
 
     public static final String CONN_TOPIC_RENAME_FMT = "topic.rename.format";
 
+    public static final String REFRESH_INTERVAL = "refresh.interval";
+
+    public static final String OFFSET_SYNC_TOPIC = "offset.sync.topic";
+
     /**
      * The required key for all configurations.
      */
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/DataType.java b/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
index 75f772e..60dc330 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
@@ -21,5 +21,6 @@ public enum DataType {
     COMMON_MESSAGE,
     TOPIC_CONFIG,
     BROKER_CONFIG,
-    SUB_CONFIG
+    SUB_CONFIG,
+    OFFSET
 }
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
new file mode 100644
index 0000000..395d75d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator.config;
+
+import io.openmessaging.KeyValue;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
+import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
+
+public class RmqConnectorConfig {
+
+    private int taskParallelism;
+    private Set<String> whiteList;
+    private String srcNamesrvs;
+    private String targetNamesrvs;
+    private String srcCluster;
+    private String targetCluster;
+    private TaskDivideStrategy taskDivideStrategy;
+    private String storeTopic;
+    private String converter;
+    private long refreshInterval;
+    private String renamePattern;
+    private String offsetSyncTopic;
+
+    public RmqConnectorConfig() {
+    }
+
+    public void validate(KeyValue config) {
+        this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM, 1);
+
+        int strategy = config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+        if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+            this.taskDivideStrategy = new DivideTaskByQueue();
+        } else {
+            this.taskDivideStrategy = new DivideTaskByTopic();
+        }
+
+        buildWhiteList(config);
+
+        srcNamesrvs = config.getString(ConfigDefine.CONN_SOURCE_RMQ);
+        srcCluster = config.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
+        targetNamesrvs = config.getString(ConfigDefine.CONN_TARGET_RMQ);
+        targetCluster = config.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+
+        storeTopic = config.getString(ConfigDefine.CONN_STORE_TOPIC);
+        converter = config.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER);
+        refreshInterval = config.getLong(ConfigDefine.REFRESH_INTERVAL, 3);
+        renamePattern = config.getString(ConfigDefine.CONN_TOPIC_RENAME_FMT);
+        offsetSyncTopic = config.getString(ConfigDefine.OFFSET_SYNC_TOPIC);
+    }
+
+    private void buildWhiteList(KeyValue config) {
+        this.whiteList = new HashSet<String>();
+        String whileListStr = config.getString(ConfigDefine.CONN_WHITE_LIST, "");
+        String[] wl = whileListStr.trim().split(",");
+        if (wl.length <= 0)
+            throw new IllegalArgumentException("White list must be not empty.");
+        else {
+            this.whiteList.clear();
+            for (String t : wl) {
+                this.whiteList.add(t.trim());
+            }
+        }
+    }
+
+    public int getTaskParallelism() {
+        return this.taskParallelism;
+    }
+
+    public Set<String> getWhiteList() {
+        return this.whiteList;
+    }
+
+    public String getSrcNamesrvs() {
+        return this.srcNamesrvs;
+    }
+
+    public String getTargetNamesrvs() {
+        return this.targetNamesrvs;
+    }
+
+    public String getSrcCluster() {
+        return this.srcCluster;
+    }
+
+    public String getTargetCluster() {
+        return this.targetCluster;
+    }
+
+    public TaskDivideStrategy getTaskDivideStrategy() {
+        return this.taskDivideStrategy;
+    }
+
+    public String getStoreTopic() {
+        return this.storeTopic;
+    }
+
+    public String getConverter() {
+        return this.converter;
+    }
+
+    public long getRefreshInterval() {
+        return this.refreshInterval;
+    }
+
+    public String getRenamePattern() {
+        return this.renamePattern;
+    }
+
+    public String getOffsetSyncTopic() {
+        return this.offsetSyncTopic;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
index e85280f..15676ce 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
@@ -16,14 +16,19 @@
  */
 package org.apache.rocketmq.replicator.config;
 
+import java.util.List;
+
 public class TaskConfig {
 
+    private String sourceCluster;
     private String storeTopic;
     private String sourceGroup;
     private String sourceRocketmq;
     private Integer dataType;
     private Long nextPosition;
     private String taskTopicList;
+    private String taskGroupList;
+    private String offsetSyncTopic;
 
     public String getSourceGroup() {
         return sourceGroup;
@@ -73,7 +78,31 @@ public class TaskConfig {
         return taskTopicList;
     }
 
+    public void setTaskGroupList(String taskGroupList) {
+        this.taskGroupList = taskGroupList;
+    }
+
+    public String getTaskGroupList() {
+        return this.taskGroupList;
+    }
+
     public void setTaskTopicList(String taskTopicList) {
         this.taskTopicList = taskTopicList;
     }
+
+    public void setSourceCluster(String sourceCluster) {
+        this.sourceCluster = sourceCluster;
+    }
+
+    public String getSourceCluster() {
+        return this.sourceCluster;
+    }
+
+    public void setOffsetSyncTopic(String offsetSyncTopic) {
+        this.offsetSyncTopic = offsetSyncTopic;
+    }
+
+    public String getOffsetSyncTopic() {
+        return offsetSyncTopic;
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
index 1516f7a..01c6787 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
@@ -21,6 +21,8 @@ public enum TaskConfigEnum {
     TASK_ID("taskId"),
     TASK_SOURCE_GROUP("sourceGroup"),
     TASK_SOURCE_ROCKETMQ("sourceRocketmq"),
+    TASK_SOURCE_CLUSTER("sourceCluster"),
+    TASK_OFFSET_SYNC_TOPIC("offsetSyncTopic"),
     TASK_SOURCE_TOPIC("sourceTopic"),
     TASK_STORE_ROCKETMQ("storeTopic"),
     TASK_DATA_TYPE("dataType"),
@@ -28,6 +30,7 @@ public enum TaskConfigEnum {
     TASK_QUEUE_ID("queueId"),
     TASK_NEXT_POSITION("nextPosition"),
     TASK_TOPIC_INFO("taskTopicList"),
+    TASK_GROUP_INFO("taskGroupList"),
     TASK_SOURCE_RECORD_CONVERTER("source-record-converter");
 
     private String key;
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
index e6a8144..07d95c8 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
@@ -20,6 +20,8 @@ public class TaskDivideConfig {
 
     private String sourceNamesrvAddr;
 
+    private String srcCluster;
+
     private String storeTopic;
 
     private String srcRecordConverter;
@@ -28,9 +30,10 @@ public class TaskDivideConfig {
 
     private int taskParallelism;
 
-    public TaskDivideConfig(String sourceNamesrvAddr, String storeTopic, String srcRecordConverter,
+    public TaskDivideConfig(String sourceNamesrvAddr, String srcCluster, String storeTopic, String srcRecordConverter,
                             int dataType, int taskParallelism) {
         this.sourceNamesrvAddr = sourceNamesrvAddr;
+        this.srcCluster = srcCluster;
         this.storeTopic = storeTopic;
         this.srcRecordConverter = srcRecordConverter;
         this.dataType = dataType;
diff --git a/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSync.java b/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSync.java
new file mode 100644
index 0000000..4678d5b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSync.java
@@ -0,0 +1,65 @@
+package org.apache.rocketmq.replicator.offset;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class OffsetSync {
+
+    private MessageQueue mq;
+    private long srcOffset;
+    private long targtOffset;
+
+    public OffsetSync(MessageQueue mq, long srcOffset, long targtOffset) {
+        this.mq = mq;
+        this.srcOffset = srcOffset;
+        this.targtOffset = targtOffset;
+    }
+
+    public void setMq(MessageQueue mq) {
+        this.mq = mq;
+    }
+
+    public void setSrcOffset(long srcOffset) {
+        this.srcOffset = srcOffset;
+    }
+
+    public void setTargtOffset(long targtOffset) {
+        this.targtOffset = targtOffset;
+    }
+
+    public long getSrcOffset() {
+        return this.srcOffset;
+    }
+
+    public long getTargtOffset() {
+        return this.targtOffset;
+    }
+
+    public MessageQueue getMq() {
+        return this.mq;
+    }
+
+    public byte[] encode() {
+        return JSON.toJSONBytes(this);
+    }
+
+    public static OffsetSync decode(byte[] body) {
+        OffsetSync sync = JSON.parseObject(body, OffsetSync.class);
+        return sync;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSyncStore.java b/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSyncStore.java
new file mode 100644
index 0000000..5703f39
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSyncStore.java
@@ -0,0 +1,90 @@
+package org.apache.rocketmq.replicator.offset;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.config.TaskConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+public class OffsetSyncStore {
+
+    private DefaultMQAdminExt adminExt;
+    private TaskConfig taskConfig;
+
+    private DefaultMQPullConsumer consumer;
+    private Map<MessageQueue, OffsetSync> syncs;
+    private long lastOffset;
+
+    public OffsetSyncStore(DefaultMQAdminExt adminExt,
+        TaskConfig taskConfig) {
+        this.adminExt = adminExt;
+        this.taskConfig = taskConfig;
+        this.syncs = new HashMap<MessageQueue, OffsetSync>();
+        this.consumer = new DefaultMQPullConsumer();
+    }
+
+    public long convertTargetOffset(MessageQueue mq, long srcOffset) {
+        OffsetSync offsetSync = latestOffsetSync(mq);
+        if (offsetSync.getSrcOffset() > srcOffset) {
+            return -1;
+        }
+        long delta = srcOffset - offsetSync.getSrcOffset();
+        return offsetSync.getTargtOffset() + delta;
+    }
+
+    private boolean sync(
+        Duration pullTimeout) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        TopicRouteData route = adminExt.examineTopicRouteInfo(taskConfig.getOffsetSyncTopic());
+        String brokerName = route.getQueueDatas().get(0).getBrokerName();
+        MessageQueue mq = new MessageQueue(taskConfig.getOffsetSyncTopic(), brokerName, 0);
+
+        PullResult pr = consumer.pull(mq, "", lastOffset, 0, pullTimeout.getNano() / Duration.ofMillis(1).getNano());
+        if (pr.getPullStatus() != PullStatus.FOUND) {
+            return false;
+        }
+        handle(pr);
+        return true;
+    }
+
+    private void handle(PullResult result) {
+        for (MessageExt msg : result.getMsgFoundList()) {
+            byte[] body = msg.getBody();
+            OffsetSync sync = OffsetSync.decode(body);
+            syncs.put(sync.getMq(), sync);
+        }
+    }
+
+    private OffsetSync latestOffsetSync(MessageQueue queue) {
+        return syncs.computeIfAbsent(queue, new Function<MessageQueue, OffsetSync>() {
+            @Override public OffsetSync apply(MessageQueue queue) {
+                return new OffsetSync(queue, -1, -1);
+            }
+        });
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java b/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
index 913ffca..a8acd08 100644
--- a/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
+++ b/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
@@ -17,7 +17,8 @@
 package org.apache.rocketmq.replicator.schema;
 
 public enum FieldName {
-    COMMON_MESSAGE("MessageExt");
+    COMMON_MESSAGE("MessageExt"),
+    OFFSET("Offset");
 
     private String key;
 
diff --git a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
index f271f14..795c386 100644
--- a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
+++ b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -51,10 +52,14 @@ public class RmqSourceReplicatorTest {
     @Test
     public void testGenerateTopic() throws NoSuchFieldException {
         RmqSourceReplicator rmqSourceReplicator = Mockito.spy(RmqSourceReplicator.class);
+
+        RmqConnectorConfig config = new RmqConnectorConfig();
         KeyValue kv = new DefaultKeyValue();
         kv.put(ConfigDefine.CONN_TOPIC_RENAME_FMT, "${topic}.replica");
+        config.validate(kv);
+
         Field field = RmqSourceReplicator.class.getDeclaredField("replicatorConfig");
-        FieldSetter.setField(rmqSourceReplicator, field, kv);
+        FieldSetter.setField(rmqSourceReplicator, field, config);
         String dstTopic = rmqSourceReplicator.generateTargetTopic("dest");
         assertThat(dstTopic).isEqualTo("dest.replica");
     }