You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:12 UTC

[rocketmq-connect] 02/39: init commit (#309)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 208e4ed922ddb19efb56b3d924f9c870fcc8a322
Author: chuenfaiy <ch...@163.com>
AuthorDate: Fri Jul 12 21:33:26 2019 +0800

    init commit (#309)
---
 .gitignore                                         |  14 ++
 pom.xml                                            |  79 +++++++++
 .../apache/rocketmq/connector/RmqConstants.java    |  32 ++++
 .../rocketmq/connector/RmqSourceConnector.java     | 129 +++++++++++++++
 .../apache/rocketmq/connector/RmqSourceTask.java   | 182 +++++++++++++++++++++
 .../apache/rocketmq/connector/common/Utils.java    |  28 ++++
 .../rocketmq/connector/config/ConfigDefine.java    |  48 ++++++
 .../rocketmq/connector/config/ConfigUtil.java      |  70 ++++++++
 .../apache/rocketmq/connector/config/DataType.java |  25 +++
 .../rocketmq/connector/config/TaskConfig.java      |  88 ++++++++++
 .../rocketmq/connector/schema/FieldName.java       |  31 ++++
 .../connector/strategy/DivideStrategyEnum.java     |  23 +++
 .../connector/strategy/DivideTaskByQueue.java      |  51 ++++++
 .../connector/strategy/DivideTaskByTopic.java      |  49 ++++++
 .../connector/strategy/TaskDivideStrategy.java     |  28 ++++
 15 files changed, 877 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..36716aa
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,14 @@
+*.iml
+*.class
+*.jar
+*dependency-reduced-pom.xml
+.classpath
+.project
+.settings/
+target/
+devenv
+*.log*
+*.iml
+.idea/
+*.versionsBackup
+.DS_Store
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 0c79eee..13a5af4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,4 +1,21 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -8,5 +25,67 @@
     <artifactId>rocketmq-connector</artifactId>
     <version>1.0-SNAPSHOT</version>
 
+    <properties>
+        <rocketmq.version>4.4.0</rocketmq.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.0-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.jooq</groupId>
+            <artifactId>joor</artifactId>
+            <version>0.9.6</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connect</artifactId>
+            <version>0.1.0-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.0-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.51</version>
+        </dependency>
+    </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqConstants.java b/src/main/java/org/apache/rocketmq/connector/RmqConstants.java
new file mode 100644
index 0000000..1278424
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/RmqConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector;
+
+public class RmqConstants {
+
+    public static final String BROKER_NAME = "brokerName";
+
+    public static final String TOPIC_NAME = "topic";
+
+    public static final String QUEUE_ID = "queueId";
+
+    public static final String NEXT_POSITION = "nextPosition";
+
+    public static String getPartition(String topic, String broker, String queueId) {
+        return new StringBuilder().append(broker).append(topic).append(queueId).toString();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
new file mode 100644
index 0000000..c919fa3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connector.config.ConfigDefine;
+import org.apache.rocketmq.connector.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connector.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.connector.strategy.DivideTaskByTopic;
+import org.apache.rocketmq.connector.strategy.TaskDivideStrategy;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RmqSourceConnector extends SourceConnector {
+
+    private static final Logger log = LoggerFactory.getLogger(RmqSourceConnector.class);
+
+    private KeyValue config;
+
+    private Map<String, List<MessageQueue>> topicRouteMap;
+
+    private final TaskDivideStrategy taskDivideStrategy;
+
+    public RmqSourceConnector() {
+
+        topicRouteMap = new HashMap<String, List<MessageQueue>>();
+
+        if (this.config.getInt(ConfigDefine.TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_TOPIC.ordinal()) {
+            taskDivideStrategy = new DivideTaskByTopic();
+        } else {
+            taskDivideStrategy = new DivideTaskByQueue();
+        }
+    }
+
+    public String verifyAndSetConfig(KeyValue config) {
+
+        for(String requestKey : ConfigDefine.REQUEST_CONFIG){
+            if(!config.containsKey(requestKey)){
+                return "Request config key: " + requestKey;
+            }
+        }
+        this.config = config;
+        return "";
+    }
+
+    public void start() {
+    }
+
+    public void stop() {
+
+    }
+
+    public void pause() {
+
+    }
+
+    public void resume() {
+
+    }
+
+    public Class<? extends Task> taskClass() {
+        return null;
+    }
+
+    public List<KeyValue> taskConfigs() {
+
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, this.config.getString(ConfigDefine.SOURCE_RMQ));
+        RPCHook rpcHook = null;
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            defaultMQAdminExt.start();
+            TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+            for (String topic: topicList.getTopicList()) {
+                if (!topic.equals(ConfigDefine.STORE_TOPIC)) {
+                    TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+                    if (!topicRouteMap.containsKey(topic)) {
+                        topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+                    }
+                    for (QueueData qd: topicRouteData.getQueueDatas()) {
+                        if (PermName.isReadable(qd.getPerm())) {
+                            for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                                MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+                                topicRouteMap.get(topic).add(mq);
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("fetch topic list error: ", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+
+        return this.taskDivideStrategy.divide(this.topicRouteMap,
+                this.config.getString(ConfigDefine.SOURCE_RMQ), this.config.getString(ConfigDefine.STORE_TOPIC));
+    }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
new file mode 100644
index 0000000..d71dc87
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.source.SourceTask;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connector.common.Utils;
+import org.apache.rocketmq.connector.config.ConfigUtil;
+import org.apache.rocketmq.connector.config.DataType;
+import org.apache.rocketmq.connector.config.TaskConfig;
+import org.apache.rocketmq.connector.schema.FieldName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class RmqSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(RmqSourceTask.class);
+
+    private final String taskId;
+    private final TaskConfig config;
+    private final DefaultMQPullConsumer consumer;
+
+    private Map<MessageQueue, Long> mqOffsetMap;
+    public RmqSourceTask() {
+        this.config = new TaskConfig();
+        this.consumer = new DefaultMQPullConsumer();
+        this.taskId = Utils.createTaskId(Thread.currentThread().getName());
+        mqOffsetMap = new HashMap<MessageQueue, Long>();
+    }
+
+    public Collection<SourceDataEntry> poll() {
+
+        if (this.config.getDataType() == DataType.COMMON_MESSAGE.ordinal()) {
+            return pollCommonMessage();
+        } else if (this.config.getDataType() == DataType.TOPIC_CONFIG.ordinal()) {
+            return pollTopicConfig();
+        } else if (this.config.getDataType() == DataType.BROKER_CONFIG.ordinal()) {
+            return pollBrokerConfig();
+        } else {
+            return pollSubConfig();
+        }
+    }
+
+    public void start(KeyValue config) {
+        ConfigUtil.load(config, this.config);
+        this.consumer.setConsumerGroup(Utils.createGroupName(this.config.getSourceTopic()));
+        this.consumer.setNamesrvAddr(this.config.getSourceRocketmq());
+        try {
+            this.consumer.start();
+            Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(this.config.getSourceTopic());
+            if (!this.config.getQueueId().equals("")) {
+                for (MessageQueue mq: mqs) {
+                    if (Integer.valueOf(this.config.getQueueId()) == mq.getQueueId()) {
+                        ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
+                                ByteBuffer.wrap(RmqConstants.getPartition(mq.getTopic(),
+                                        mq.getBrokerName(), String.valueOf(mq.getQueueId()))
+                                        .getBytes("UTF-8")));
+
+                        if (null != positionInfo && positionInfo.array().length > 0) {
+                            String positionJson = new String(positionInfo.array(), "UTF-8");
+                            JSONObject jsonObject = JSONObject.parseObject(positionJson);
+                            this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
+                        }
+                        mqOffsetMap.put(mq, this.config.getNextPosition());
+                    }
+                }
+            } else {
+                for (MessageQueue mq: mqs) {
+                    ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
+                            ByteBuffer.wrap(RmqConstants.getPartition(mq.getTopic(),
+                                    mq.getBrokerName(), String.valueOf(mq.getQueueId()))
+                                    .getBytes("UTF-8")));
+
+                    if (null != positionInfo && positionInfo.array().length > 0) {
+                        String positionJson = new String(positionInfo.array(), "UTF-8");
+                        JSONObject jsonObject = JSONObject.parseObject(positionJson);
+                        this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
+                    }
+                    mqOffsetMap.put(mq, this.config.getNextPosition());
+                }
+            }
+
+        } catch (Exception e) {
+            log.error("consumer of task {} start failed. ", this.taskId, e);
+        }
+    }
+
+    public void stop() {
+        this.consumer.shutdown();
+    }
+
+    public void pause() {
+
+    }
+
+    public void resume() {
+
+    }
+
+    private Collection<SourceDataEntry> pollCommonMessage() {
+
+        List<SourceDataEntry> res = new ArrayList<SourceDataEntry>();
+
+        try {
+            for (MessageQueue mq : this.mqOffsetMap.keySet()) {
+                PullResult pullResult = consumer.pull(mq, "*", this.mqOffsetMap.get(mq), 32);
+                switch (pullResult.getPullStatus()) {
+                    case FOUND: {
+                        this.mqOffsetMap.put(mq, pullResult.getNextBeginOffset());
+                        JSONObject jsonObject = new JSONObject();
+                        jsonObject.put(RmqConstants.NEXT_POSITION, pullResult.getNextBeginOffset());
+
+                        List<MessageExt> msgs = pullResult.getMsgFoundList();
+                        for (MessageExt m : msgs) {
+                            Schema schema = new Schema();
+                            schema.setDataSource(this.config.getSourceRocketmq());
+                            schema.setName(this.config.getSourceTopic());
+                            schema.setFields(new ArrayList<Field>());
+                            schema.getFields().add(new Field(0, FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
+
+                            DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+                            dataEntryBuilder.timestamp(System.currentTimeMillis())
+                                    .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
+                            dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(m));
+                            SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                                    ByteBuffer.wrap(RmqConstants.getPartition(this.config.getSourceTopic(),
+                                            this.config.getBrokerName(),
+                                            this.config.getQueueId()).getBytes("UTF-8")),
+                                    ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
+                            );
+                            res.add(sourceDataEntry);
+                        }
+                        break;
+                    }
+                    default:
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            log.error("Rocketmq connector task poll error, current config: {}", JSON.toJSONString(config), e);
+        }
+
+        return res;
+    }
+
+    private Collection<SourceDataEntry> pollTopicConfig() {
+        return new ArrayList<SourceDataEntry>();
+    }
+
+    private Collection<SourceDataEntry> pollBrokerConfig() {
+        return new ArrayList<SourceDataEntry>();
+    }
+
+    private Collection<SourceDataEntry> pollSubConfig() {
+        return new ArrayList<SourceDataEntry>();
+    }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
new file mode 100644
index 0000000..71f538c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.common;
+
+public class Utils {
+
+    public static String createGroupName(String prefix) {
+        return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createTaskId(String prefix) {
+        return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
new file mode 100644
index 0000000..64e9c94
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ConfigDefine {
+
+    public static final String SOURCE_RMQ = "sourceRocketmq";
+
+    public static final String STORE_TOPIC = "storeTopic";
+
+    public static final String TARGET_RMQ = "targetRocketmq";
+
+    public static final String DATA_TYPE = "dataType";
+
+    public static final String QUEUE_ID = "queueId";
+
+    public static final String TASK_DIVIDE_STRATEGY = "taskDivideStrategy";
+
+    public static final String BROKER_NAME = "brokerName";
+
+    public static final String SOURCE_TOPIC = "sourceTopic";
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+        {
+            add("sourceRocketmq");
+            add("targetRocketmq");
+            add("storeTopic");
+            add("taskDivideStrategy");
+        }
+    };
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java
new file mode 100644
index 0000000..d2dd7da
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.config;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+
+    public static <T> void load(KeyValue props, Object object) {
+
+        properties2Object(props, object);
+    }
+
+    private static <T> void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/DataType.java b/src/main/java/org/apache/rocketmq/connector/config/DataType.java
new file mode 100644
index 0000000..3e77a3a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/config/DataType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.config;
+
+public enum DataType {
+
+    COMMON_MESSAGE,
+    TOPIC_CONFIG,
+    BROKER_CONFIG,
+    SUB_CONFIG
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
new file mode 100644
index 0000000..7bba3fb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.config;
+
+public class TaskConfig {
+
+    private String storeTopic;
+    private String sourceTopic;
+    private String sourceRocketmq;
+    private Integer dataType;
+    private String brokerName;
+    private String queueId;
+    private Long nextPosition;
+
+    public String getStoreTopic() {
+        return storeTopic;
+    }
+
+    public void setStoreTopic(String storeTopic) {
+        this.storeTopic = storeTopic;
+    }
+
+    public String getSourceTopic() {
+        return sourceTopic;
+    }
+
+    public void setSourceTopic(String sourceTopic) {
+        this.sourceTopic = sourceTopic;
+    }
+
+    public String getSourceRocketmq() {
+        return sourceRocketmq;
+    }
+
+    public void setSourceRocketmq(String sourceRocketmq) {
+        this.sourceRocketmq = sourceRocketmq;
+    }
+
+    public int getDataType() {
+        return dataType;
+    }
+
+    public void setDataType(int dataType) {
+        this.dataType = dataType;
+    }
+
+    public void setDataType(Integer dataType) {
+        this.dataType = dataType;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public String getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(String queueId) {
+        this.queueId = queueId;
+    }
+
+    public Long getNextPosition() {
+        return nextPosition;
+    }
+
+    public void setNextPosition(Long nextPosition) {
+        this.nextPosition = nextPosition;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java b/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java
new file mode 100644
index 0000000..9f4dc47
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.schema;
+
+public enum FieldName {
+    COMMON_MESSAGE("MessageExt");
+
+    private String key;
+
+    FieldName(String key) {
+        this.key = key;
+    }
+
+    public String getKey() {
+        return key;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java
new file mode 100644
index 0000000..9dc060f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.strategy;
+
+public enum DivideStrategyEnum {
+
+    BY_TOPIC,
+    BY_QUEUE
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
new file mode 100644
index 0000000..b6eae8f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.strategy;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connector.config.ConfigDefine;
+import org.apache.rocketmq.connector.config.DataType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DivideTaskByQueue extends TaskDivideStrategy {
+    public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, String source, String storeTopic) {
+
+        List<KeyValue> config = new ArrayList<KeyValue>();
+
+        for (String t: topicRouteMap.keySet()) {
+            for (MessageQueue mq: topicRouteMap.get(t)) {
+                KeyValue keyValue = new DefaultKeyValue();
+                keyValue.put(ConfigDefine.STORE_TOPIC, storeTopic);
+                keyValue.put(ConfigDefine.SOURCE_RMQ, source);
+                keyValue.put(ConfigDefine.STORE_TOPIC, t);
+                keyValue.put(ConfigDefine.BROKER_NAME, mq.getBrokerName());
+                keyValue.put(ConfigDefine.QUEUE_ID, String.valueOf(mq.getQueueId()));
+                keyValue.put(ConfigDefine.SOURCE_TOPIC, t);
+                keyValue.put(ConfigDefine.DATA_TYPE, DataType.COMMON_MESSAGE.ordinal());
+                config.add(keyValue);
+            }
+        }
+
+        return config;
+
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
new file mode 100644
index 0000000..7c96b28
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.strategy;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connector.config.ConfigDefine;
+import org.apache.rocketmq.connector.config.DataType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DivideTaskByTopic extends TaskDivideStrategy {
+
+    public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, String source, String storeTopic) {
+
+        List<KeyValue> config = new ArrayList<KeyValue>();
+
+        for (String t: topicRouteMap.keySet()) {
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(ConfigDefine.STORE_TOPIC, storeTopic);
+            keyValue.put(ConfigDefine.SOURCE_RMQ, source);
+            keyValue.put(ConfigDefine.STORE_TOPIC, t);
+            keyValue.put(ConfigDefine.BROKER_NAME, "");
+            keyValue.put(ConfigDefine.QUEUE_ID, "");
+            keyValue.put(ConfigDefine.SOURCE_TOPIC, t);
+            keyValue.put(ConfigDefine.DATA_TYPE, DataType.COMMON_MESSAGE.ordinal());
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
new file mode 100644
index 0000000..f847cb1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.strategy;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class TaskDivideStrategy {
+
+    public abstract List<KeyValue> divide(Map<String, List<MessageQueue>> topicMap, String source, String storeTopic);
+}