You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:37 UTC

[rocketmq-connect] 29/43: [ISSUE #489] JDBC Connector support divide task by topic strategy (#490)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 341d6f356047b0fe908a2aad1ca901cdd7317aef
Author: Xiongmengfei <Xi...@163.com>
AuthorDate: Tue Dec 31 15:21:39 2019 +0800

    [ISSUE #489] JDBC Connector support divide task by topic strategy (#490)
    
    * [ISSUE #489] JDBC Connector support divide task by topic strategy
    
    * [ISSUE #489] JDBC Connector support divide task by topic strategy
---
 .../rocketmq/connect/jdbc/config/Config.java       |  70 +++++++++----
 .../rocketmq/connect/jdbc/config/DataType.java     |  26 +++++
 .../connect/jdbc/config/DbConnectorConfig.java     |  84 ++++++++++++++++
 .../connect/jdbc/config/SinkDbConnectorConfig.java |  67 ++++++++++++
 .../jdbc/config/SourceDbConnectorConfig.java       |  73 ++++++++++++++
 .../connect/jdbc/config/TaskDivideConfig.java      | 112 +++++++++++++++++++++
 .../connect/jdbc/config/TaskTopicInfo.java         |  37 +++++++
 .../connect/jdbc/connector/JdbcSinkConnector.java  |  39 +++++--
 .../connect/jdbc/connector/JdbcSinkTask.java       |   1 +
 .../jdbc/connector/JdbcSourceConnector.java        |  37 +++++--
 .../connect/jdbc/connector/JdbcSourceTask.java     |  13 +--
 .../connect/jdbc/strategy/DivideStrategyEnum.java  |  23 +++++
 .../jdbc/strategy/DivideTaskByConsistentHash.java  |  82 +++++++++++++++
 .../connect/jdbc/strategy/DivideTaskByQueue.java   |  62 ++++++++++++
 .../connect/jdbc/strategy/DivideTaskByTopic.java   | 104 +++++++++++++++++++
 .../connect/jdbc/strategy/TaskDivideStrategy.java  |  32 ++++++
 16 files changed, 822 insertions(+), 40 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
index 5491d43..91a3e51 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.connect.jdbc.config;
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,9 +29,11 @@ public class Config {
     private static final Logger LOG = LoggerFactory.getLogger(Config.class);
 
     /* Database Connection Config */
-    private String jdbcUrl;
-    private String jdbcUsername;
-    private String jdbcPassword;
+    private String dbUrl;
+    private String dbPort;
+    private String dbUsername;
+    private String dbPassword;
+    private String dataType;
     private String rocketmqTopic;
     private String jdbcBackoff;
     private String jdbcAttempts;
@@ -44,6 +47,18 @@ public class Config {
     private String whiteDataBase;
     private String whiteTable;
 
+    public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+    public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
+    public static final String CONN_WHITE_LIST = "whiteDataBase";
+    public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";
+    public static final String CONN_DB_IP = "dbUrl";
+    public static final String CONN_DB_PORT = "dbPort";
+    public static final String CONN_DB_USERNAME = "dbUsername";
+    public static final String CONN_DB_PASSWORD = "dbPassword";
+    public static final String CONN_DATA_TYPE = "dataType";
+    public static final String CONN_TOPIC_NAMES = "topicNames";
+    public static final String CONN_DB_MODE = "mode";
+
     /* Mode Config */
     private String mode = "";
     private String incrementingColumnName = "";
@@ -57,15 +72,16 @@ public class Config {
     private int batchMaxRows = 100;
     private long tablePollInterval = 60000;
     private long timestampDelayInterval = 0;
-    private String dbTimezone = "UTC";
+    private String dbTimezone = "GMT+8";
     private String queueName;
 
     private Logger log = LoggerFactory.getLogger(Config.class);
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
         {
-            add("jdbcUrl");
-            add("jdbcUsername");
-            add("jdbcPassword");
+            add("dbUrl");
+            add("dbPort");
+            add("dbUsername");
+            add("dbPassword");
             add("mode");
             add("rocketmqTopic");
         }
@@ -79,28 +95,44 @@ public class Config {
         this.queueName = queueName;
     }
 
-    public String getJdbcUrl() {
-        return jdbcUrl;
+    public String getDbUrl() {
+        return dbUrl;
+    }
+
+    public void setDbUrl(String dbUrl) {
+        this.dbUrl = dbUrl;
+    }
+
+    public String getDbPort() {
+        return dbPort;
+    }
+
+    public void setDbPort(String dbPort) {
+        this.dbPort = dbPort;
+    }
+
+    public String getDbUsername() {
+        return dbUsername;
     }
 
-    public void setJdbcUrl(String jdbcUrl) {
-        this.jdbcUrl = jdbcUrl;
+    public void setDbUsername(String dbUsername) {
+        this.dbUsername = dbUsername;
     }
 
-    public String getJdbcUsername() {
-        return jdbcUsername;
+    public String getDbPassword() {
+        return dbPassword;
     }
 
-    public void setJdbcUsername(String jdbcUsername) {
-        this.jdbcUsername = jdbcUsername;
+    public void setDbPassword(String dbPassword) {
+        this.dbPassword = dbPassword;
     }
 
-    public String getJdbcPassword() {
-        return jdbcPassword;
+    public String getDataType() {
+        return dataType;
     }
 
-    public void setJdbcPassword(String jdbcPassword) {
-        this.jdbcPassword = jdbcPassword;
+    public void setDataType(String dataType) {
+        this.dataType = dataType;
     }
 
     public String getRocketmqTopic() {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/DataType.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DataType.java
new file mode 100644
index 0000000..ef7408a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DataType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.config;
+
+public enum DataType {
+
+    COMMON_MESSAGE,
+    TOPIC_CONFIG,
+    BROKER_CONFIG,
+    SUB_CONFIG,
+    OFFSET
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/DbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DbConnectorConfig.java
new file mode 100644
index 0000000..43bd165
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DbConnectorConfig.java
@@ -0,0 +1,84 @@
+package org.apache.rocketmq.connect.jdbc.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.jdbc.strategy.TaskDivideStrategy;
+
+public abstract class DbConnectorConfig {
+
+    public TaskDivideStrategy taskDivideStrategy;
+    public String dbUrl;
+    public String dbPort;
+    public String dbUserName;
+    public String dbPassword;
+    public String converter;
+    public int taskParallelism;
+    public String mode;
+
+    public abstract void validate(KeyValue config);
+
+    public abstract <T> T getWhiteTopics();
+
+    public TaskDivideStrategy getTaskDivideStrategy() {
+        return taskDivideStrategy;
+    }
+
+    public void setTaskDivideStrategy(TaskDivideStrategy taskDivideStrategy) {
+        this.taskDivideStrategy = taskDivideStrategy;
+    }
+
+    public String getDbUrl() {
+        return dbUrl;
+    }
+
+    public void setDbUrl(String dbUrl) {
+        this.dbUrl = dbUrl;
+    }
+
+    public String getDbPort() {
+        return dbPort;
+    }
+
+    public void setDbPort(String dbPort) {
+        this.dbPort = dbPort;
+    }
+
+    public String getDbUserName() {
+        return dbUserName;
+    }
+
+    public void setDbUserName(String dbUserName) {
+        this.dbUserName = dbUserName;
+    }
+
+    public String getDbPassword() {
+        return dbPassword;
+    }
+
+    public void setDbPassword(String dbPassword) {
+        this.dbPassword = dbPassword;
+    }
+
+    public String getConverter() {
+        return converter;
+    }
+
+    public void setConverter(String converter) {
+        this.converter = converter;
+    }
+
+    public int getTaskParallelism() {
+        return taskParallelism;
+    }
+
+    public void setTaskParallelism(int taskParallelism) {
+        this.taskParallelism = taskParallelism;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
new file mode 100644
index 0000000..851b253
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
@@ -0,0 +1,67 @@
+package org.apache.rocketmq.connect.jdbc.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class SinkDbConnectorConfig extends DbConnectorConfig{
+
+    private Set<String> whiteList;
+
+    public SinkDbConnectorConfig(){
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0);
+
+        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+        if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+            this.taskDivideStrategy = new DivideTaskByQueue();
+        } else {
+            this.taskDivideStrategy = new DivideTaskByTopic();
+        }
+
+        buildWhiteList(config);
+
+        this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+        this.dbUrl = config.getString(Config.CONN_DB_IP);
+        this.dbPort = config.getString(Config.CONN_DB_PORT);
+        this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+        this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+
+    }
+
+    private void buildWhiteList(KeyValue config) {
+        this.whiteList = new HashSet<>();
+        String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+        String[] wl = whiteListStr.trim().split(",");
+        if (wl.length <= 0)
+            throw new IllegalArgumentException("White list must be not empty.");
+        else {
+            this.whiteList.clear();
+            for (String t : wl) {
+                this.whiteList.add(t.trim());
+            }
+        }
+    }
+
+
+    public Set<String> getWhiteList() {
+        return whiteList;
+    }
+
+    public void setWhiteList(Set<String> whiteList) {
+        this.whiteList = whiteList;
+    }
+
+    @Override
+    public Set<String> getWhiteTopics() {
+        return getWhiteList();
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
new file mode 100644
index 0000000..801e411
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
@@ -0,0 +1,73 @@
+package org.apache.rocketmq.connect.jdbc.config;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SourceDbConnectorConfig extends DbConnectorConfig{
+
+    private Map<String, String> whiteMap;
+
+    public SourceDbConnectorConfig(){
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0);
+
+        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+        if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+            this.taskDivideStrategy = new DivideTaskByQueue();
+        } else {
+            this.taskDivideStrategy = new DivideTaskByTopic();
+        }
+
+        buildWhiteMap(config);
+
+        this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+        this.dbUrl = config.getString(Config.CONN_DB_IP);
+        this.dbPort = config.getString(Config.CONN_DB_PORT);
+        this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+        this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+        this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+    }
+
+    private void buildWhiteMap(KeyValue config) {
+        this.whiteMap = new HashMap<>(16);
+        String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+        JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteListStr);
+        if(whiteDataBaseObject.keySet().size() <= 0){
+            throw new IllegalArgumentException("white data base must be not empty.");
+        }else {
+            this.whiteMap.clear();
+            for (String dbName : whiteDataBaseObject.keySet()){
+                JSONObject whiteTableObject = (JSONObject) whiteDataBaseObject.get(dbName);
+                for (String tableName : whiteTableObject.keySet()){
+                    String dbTableKey = dbName + "-" + tableName;
+                    this.whiteMap.put(dbTableKey, whiteTableObject.getString(tableName));
+                }
+            }
+        }
+    }
+
+
+    public Map<String, String> getWhiteMap() {
+        return whiteMap;
+    }
+
+    public void setWhiteMap(Map<String, String> whiteMap) {
+        this.whiteMap = whiteMap;
+    }
+
+    @Override
+    public Map<String, String> getWhiteTopics() {
+        return getWhiteMap();
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskDivideConfig.java
new file mode 100644
index 0000000..8b15a2f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskDivideConfig.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.config;
+
+public class TaskDivideConfig {
+
+    private String dbUrl;
+
+    private String dbPort;
+
+    private String dbUserName;
+
+    private String dbPassword;
+
+    private String srcRecordConverter;
+
+    private int dataType;
+
+    private int taskParallelism;
+
+    private String mode;
+
+    public TaskDivideConfig(String dbUrl, String dbPort, String dbUserName, String dbPassword, String srcRecordConverter,
+                            int dataType, int taskParallelism, String mode) {
+        this.dbUrl = dbUrl;
+        this.dbPort = dbPort;
+        this.dbUserName = dbUserName;
+        this.dbPassword = dbPassword;
+        this.srcRecordConverter = srcRecordConverter;
+        this.dataType = dataType;
+        this.taskParallelism = taskParallelism;
+        this.mode = mode;
+    }
+
+    public String getDbUrl() {
+        return dbUrl;
+    }
+
+    public void setDbUrl(String dbUrl) {
+        this.dbUrl = dbUrl;
+    }
+
+    public String getDbPort() {
+        return dbPort;
+    }
+
+    public void setDbPort(String dbPort) {
+        this.dbPort = dbPort;
+    }
+
+    public String getDbUserName() {
+        return dbUserName;
+    }
+
+    public void setDbUserName(String dbUserName) {
+        this.dbUserName = dbUserName;
+    }
+
+    public String getDbPassword() {
+        return dbPassword;
+    }
+
+    public void setDbPassword(String dbPassword) {
+        this.dbPassword = dbPassword;
+    }
+
+    public String getSrcRecordConverter() {
+        return srcRecordConverter;
+    }
+
+    public void setSrcRecordConverter(String srcRecordConverter) {
+        this.srcRecordConverter = srcRecordConverter;
+    }
+
+    public int getDataType() {
+        return dataType;
+    }
+
+    public void setDataType(int dataType) {
+        this.dataType = dataType;
+    }
+
+    public int getTaskParallelism() {
+        return taskParallelism;
+    }
+
+    public void setTaskParallelism(int taskParallelism) {
+        this.taskParallelism = taskParallelism;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskTopicInfo.java
new file mode 100644
index 0000000..5c2a21e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskTopicInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.config;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class TaskTopicInfo extends MessageQueue {
+
+    private String targetTopic;
+
+    public TaskTopicInfo(String sourceTopic, String brokerName, int queueId, String targetTopic) {
+        super(sourceTopic, brokerName, queueId);
+        this.targetTopic = targetTopic;
+    }
+
+    public String getTargetTopic() {
+        return this.targetTopic;
+    }
+
+    public void setTargetTopic(String targetTopic) {
+        this.targetTopic = targetTopic;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index e1d1256..935ad52 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -3,15 +3,23 @@ package org.apache.rocketmq.connect.jdbc.connector;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.sink.SinkConnector;
-import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.config.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
 
 public class JdbcSinkConnector extends SinkConnector {
-
+    private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class);
     private KeyValue config;
+    private DbConnectorConfig dbConnectorConfig;
+    private volatile boolean configValid = false;
+
+    public JdbcSinkConnector(){
+        dbConnectorConfig = new SinkDbConnectorConfig();
+    }
 
     @Override
     public String verifyAndSetConfig(KeyValue config) {
@@ -20,7 +28,13 @@ public class JdbcSinkConnector extends SinkConnector {
                 return "Request config key: " + requestKey;
             }
         }
-        this.config = config;
+        try {
+            this.dbConnectorConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
+        }
+        this.configValid = true;
+
         return "";
     }
 
@@ -51,8 +65,21 @@ public class JdbcSinkConnector extends SinkConnector {
 
     @Override
     public List<KeyValue> taskConfigs() {
-        List<KeyValue> config = new ArrayList<>();
-        config.add(this.config);
-        return config;
+        log.info("List.start");
+        if (!configValid) {
+            return new ArrayList<KeyValue>();
+        }
+
+        TaskDivideConfig tdc = new TaskDivideConfig(
+                this.dbConnectorConfig.getDbUrl(),
+                this.dbConnectorConfig.getDbPort(),
+                this.dbConnectorConfig.getDbUserName(),
+                this.dbConnectorConfig.getDbPassword(),
+                this.dbConnectorConfig.getConverter(),
+                DataType.COMMON_MESSAGE.ordinal(),
+                this.dbConnectorConfig.getTaskParallelism(),
+                this.dbConnectorConfig.getMode()
+        );
+        return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
index 4b55e5a..31f43e3 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
@@ -114,6 +114,7 @@ public class JdbcSinkTask extends SinkTask {
         try {
             if (connection != null){
                 connection.close();
+                log.info("jdbc sink task connection is closed.");
             }
         } catch (Throwable e) {
             log.warn("sink task stop error while closing connection to {}", "jdbc", e);
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index 796f0e6..a083e84 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.connect.jdbc.connector;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.config.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,19 +30,29 @@ import io.openmessaging.connector.api.source.SourceConnector;
 
 public class JdbcSourceConnector extends SourceConnector {
     private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnector.class);
-    private KeyValue config;
+    private DbConnectorConfig dbConnectorConfig;
+    private volatile boolean configValid = false;
+
+    public JdbcSourceConnector() {
+        dbConnectorConfig = new SourceDbConnectorConfig();
+    }
 
     @Override
     public String verifyAndSetConfig(KeyValue config) {
 
-        log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter");
+        log.info("JdbcSourceConnector verifyAndSetConfig enter");
         for (String requestKey : Config.REQUEST_CONFIG) {
 
             if (!config.containsKey(requestKey)) {
                 return "Request config key: " + requestKey;
             }
         }
-        this.config = config;
+        try {
+            this.dbConnectorConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
+        }
+        this.configValid = true;
 
         return "";
     }
@@ -75,8 +85,21 @@ public class JdbcSourceConnector extends SourceConnector {
     @Override
     public List<KeyValue> taskConfigs() {
         log.info("List.start");
-        List<KeyValue> config = new ArrayList<>();
-        config.add(this.config);
-        return config;
+        if (!configValid) {
+            return new ArrayList<KeyValue>();
+        }
+
+        TaskDivideConfig tdc = new TaskDivideConfig(
+                this.dbConnectorConfig.getDbUrl(),
+                this.dbConnectorConfig.getDbPort(),
+                this.dbConnectorConfig.getDbUserName(),
+                this.dbConnectorConfig.getDbPassword(),
+                this.dbConnectorConfig.getConverter(),
+                DataType.COMMON_MESSAGE.ordinal(),
+                this.dbConnectorConfig.getTaskParallelism(),
+                this.dbConnectorConfig.getMode()
+        );
+        return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
     }
+
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 943d432..d533395 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -22,11 +22,7 @@ import io.openmessaging.connector.api.source.SourceTask;
 
 import java.nio.ByteBuffer;
 import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -82,7 +78,7 @@ public class JdbcSourceTask extends SourceTask {
                 querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
             else
                 querier = tableQueue.peek();
-            Timer timer = new java.util.Timer();
+            Timer timer = new Timer();
             try {
                 Thread.currentThread();
                 Thread.sleep(1000);//毫秒
@@ -114,7 +110,7 @@ public class JdbcSourceTask extends SourceTask {
                 }
 
                 SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                        ByteBuffer.wrap(config.getJdbcUrl().getBytes("UTF-8")),
+                        ByteBuffer.wrap((config.getDbUrl() + config.getDbPort()).getBytes("UTF-8")),
                         ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
                 res.add(sourceDataEntry);
                 log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry));
@@ -163,8 +159,9 @@ public class JdbcSourceTask extends SourceTask {
     @Override
     public void stop() {
         try {
-            if (connection != null){
+            if (connection != null) {
                 connection.close();
+                log.info("jdbc source task connection is closed.");
             }
         } catch (Throwable e) {
             log.warn("source task stop error while closing connection to {}", "jdbc", e);
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideStrategyEnum.java
new file mode 100644
index 0000000..0afa470
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideStrategyEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.strategy;
+
+public enum DivideStrategyEnum {
+
+    BY_TOPIC,
+    BY_QUEUE
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java
new file mode 100644
index 0000000..bac7358
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java
@@ -0,0 +1,82 @@
+package org.apache.rocketmq.connect.jdbc.strategy;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
+import org.apache.rocketmq.common.consistenthash.Node;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskConfigEnum;
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+
+import java.util.*;
+
+public class DivideTaskByConsistentHash extends TaskDivideStrategy {
+    @Override public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
+
+        List<KeyValue> config = new ArrayList<>();
+        int parallelism = tdc.getTaskParallelism();
+        Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<>();
+        int id = -1;
+
+        Collection<ClientNode> cidNodes = new ArrayList<>();
+        for (int i = 0; i < parallelism; i++) {
+            cidNodes.add(new ClientNode(i, Integer.toString(i)));
+            queueTopicList.put(i, new ArrayList<>());
+        }
+
+        ConsistentHashRouter<ClientNode> router = new ConsistentHashRouter<>(cidNodes, cidNodes.size());
+
+        for (String t : topicMap.keySet()) {
+            for (TaskTopicInfo queue : topicMap.get(t)) {
+                ClientNode clientNode = router.routeNode(queue.toString());
+                if (clientNode != null) {
+                    queueTopicList.get(clientNode.index).add(queue);
+                }
+            }
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
+            keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
+            keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i)));
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+
+    private static class ClientNode implements Node {
+        private final String clientID;
+        private final int index;
+
+        public ClientNode(int index, String clientID) {
+            this.index = index;
+            this.clientID = clientID;
+        }
+
+        @Override
+        public String getKey() {
+            return clientID;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
new file mode 100644
index 0000000..7ef5c31
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskConfigEnum;
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DivideTaskByQueue extends TaskDivideStrategy {
+    @Override
+    public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+
+        List<KeyValue> config = new ArrayList<KeyValue>();
+        int parallelism = tdc.getTaskParallelism();
+        Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
+        int id = -1;
+        for (String t : topicRouteMap.keySet()) {
+            for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
+                int ind = ++id % parallelism;
+                if (!queueTopicList.containsKey(ind)) {
+                    queueTopicList.put(ind, new ArrayList<TaskTopicInfo>());
+                }
+                queueTopicList.get(ind).add(taskTopicInfo);
+            }
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
+            keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
+            keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i)));
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
new file mode 100644
index 0000000..762c7a0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.jdbc.config.*;
+import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
+
+import java.util.*;
+
+public class DivideTaskByTopic extends TaskDivideStrategy {
+    @Override
+    public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        if (dbConnectorConfig instanceof SourceDbConnectorConfig){
+            return divideSourceTaskByTopic(dbConnectorConfig, tdc);
+        }else {
+            return divideSinkTaskByTopic(dbConnectorConfig, tdc);
+        }
+
+    }
+
+    private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        List<KeyValue> config = new ArrayList<KeyValue>();
+        int parallelism = tdc.getTaskParallelism();
+        int id = -1;
+        Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+        Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
+        for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
+            int ind = ++id % parallelism;
+            if (!taskTopicList.containsKey(ind)) {
+                taskTopicList.put(ind, new HashMap<>());
+            }
+            String dbKey = entry.getKey().split("-")[0];
+            String tableKey = entry.getKey().split("-")[1];
+            String filter = entry.getValue();
+            Map<String, String> tableMap = new HashMap<>();
+            tableMap.put(tableKey, filter);
+            taskTopicList.get(ind).put(dbKey, tableMap);
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            KeyValue keyValue = new DefaultKeyValue();
+
+            keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+            keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+            keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+            keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+            keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
+            keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+            keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+            keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+
+    private List<KeyValue> divideSinkTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        List<KeyValue> config = new ArrayList<KeyValue>();
+        int parallelism = tdc.getTaskParallelism();
+        int id = -1;
+        Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+        Map<Integer, String> taskTopicList = new HashMap<>();
+        for (String topicName : topicRouteSet) {
+            int ind = ++id % parallelism;
+            if (!taskTopicList.containsKey(ind)) {
+                taskTopicList.put(ind, topicName);
+            }
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+            keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+            keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+            keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+            keyValue.put(Config.CONN_TOPIC_NAMES, JSONObject.toJSONString(taskTopicList.get(i)));
+            keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+            keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+            keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/TaskDivideStrategy.java
new file mode 100644
index 0000000..736fcac
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/TaskDivideStrategy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.strategy;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import org.apache.rocketmq.connect.jdbc.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.jdbc.config.TaskDivideConfig;
+import org.apache.rocketmq.connect.jdbc.config.TaskTopicInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class TaskDivideStrategy {
+
+    public abstract List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc);
+
+}