You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:44 UTC

[rocketmq-connect] 36/43: [ISSUE #545]bug fix (#546)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 0b56e472982adb92ba7fa52bf2d3c9b5f326e969
Author: mike_xwm <mi...@126.com>
AuthorDate: Wed Apr 1 13:54:08 2020 +0800

    [ISSUE #545]bug fix (#546)
    
    Co-authored-by: MajorHe1 <he...@gmail.com>
---
 pom.xml                                            |  2 +
 .../rocketmq/connect/jdbc/common/CloneUtils.java   | 28 +++++++++++
 .../rocketmq/connect/jdbc/common/ConstDefine.java  |  2 +-
 .../rocketmq/connect/jdbc/common/DBUtils.java      |  2 +-
 .../rocketmq/connect/jdbc/config/Config.java       | 31 ------------
 .../connect/jdbc/config/SinkDbConnectorConfig.java | 11 +++--
 .../jdbc/config/SourceDbConnectorConfig.java       |  4 +-
 .../connect/jdbc/connector/JdbcSinkConnector.java  | 56 ++++++++++++----------
 .../connect/jdbc/connector/JdbcSourceTask.java     |  9 ++--
 .../apache/rocketmq/connect/jdbc/sink/Updater.java | 22 +++++----
 .../rocketmq/connect/jdbc/source/Querier.java      |  7 ++-
 .../jdbc/source/TimestampIncrementingQuerier.java  |  6 +--
 .../connect/jdbc/strategy/DivideTaskByQueue.java   |  3 +-
 .../connect/jdbc/strategy/DivideTaskByTopic.java   | 14 ++++--
 14 files changed, 107 insertions(+), 90 deletions(-)

diff --git a/pom.xml b/pom.xml
index 9df23c4..61680f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,7 @@
             <groupId>io.openmessaging</groupId>
             <artifactId>openmessaging-connector</artifactId>
             <version>0.1.1</version>
+            <scope>provided</scope>
         </dependency>
 		<dependency>
 			<groupId>io.openmessaging</groupId>
@@ -264,6 +265,7 @@
             <artifactId>druid</artifactId>
             <version>1.0.31</version>
         </dependency>
+
     </dependencies>
 
 </project>
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/CloneUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/CloneUtils.java
new file mode 100644
index 0000000..f0ff98e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/CloneUtils.java
@@ -0,0 +1,28 @@
+package org.apache.rocketmq.connect.jdbc.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class CloneUtils {
+    @SuppressWarnings("unchecked")
+    public static <T extends Serializable> T clone(T obj) {
+        T clonedObj = null;
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(obj);
+            oos.close();
+
+            ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+            ObjectInputStream ois = new ObjectInputStream(bais);
+            clonedObj = (T) ois.readObject();
+            ois.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return clonedObj;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
index f49d367..e6d2f7a 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
@@ -19,5 +19,5 @@ package org.apache.rocketmq.connect.jdbc.common;
 public class ConstDefine {
 
     public static String JDBC_CONNECTOR_ADMIN_PREFIX = "JDBC-CONNECTOR-ADMIN";
-
+    public static final String PREFIX = "jdbc";
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
index ab58153..31a86d1 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
@@ -196,7 +196,7 @@ public class DBUtils {
         map.put("username", config.getDbUsername());
         map.put("password", config.getDbPassword());
         map.put("initialSize", "1");
-        map.put("maxActive", "1");
+        map.put("maxActive", "2");
         map.put("maxWait", "60000");
         map.put("timeBetweenEvictionRunsMillis", "60000");
         map.put("minEvictableIdleTimeMillis", "300000");
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
index cca1aa5..9162bad 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
@@ -78,9 +78,6 @@ public class Config {
     private long timestampDelayInterval = 0;
     private String dbTimezone = "GMT+8";
     private String queueName;
-    private String jdbcUrl;
-    private String jdbcUsername;
-    private String jdbcPassword;
 
     private Logger log = LoggerFactory.getLogger(Config.class);
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
@@ -327,32 +324,4 @@ public class Config {
     public void setWhiteTable(String whiteTable) {
         this.whiteTable = whiteTable;
     }
-
-    public void setPollInterval(long pollInterval) {
-        this.pollInterval = pollInterval;
-    }
-
-    public String getJdbcUrl() {
-        return jdbcUrl;
-    }
-
-    public void setJdbcUrl(String jdbcUrl) {
-        this.jdbcUrl = jdbcUrl;
-    }
-
-    public String getJdbcUsername() {
-        return jdbcUsername;
-    }
-
-    public void setJdbcUsername(String jdbcUsername) {
-        this.jdbcUsername = jdbcUsername;
-    }
-
-    public String getJdbcPassword() {
-        return jdbcPassword;
-    }
-
-    public void setJdbcPassword(String jdbcPassword) {
-        this.jdbcPassword = jdbcPassword;
-    }
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
index 3ff4f71..26b1541 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
@@ -16,16 +16,16 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{
     private String srcNamesrvs;
     private String srcCluster;
     private long refreshInterval;
-    private Map<String, List<TaskTopicInfo>> topicRouteMap;
+    private Map<String, Set<TaskTopicInfo>> topicRouteMap;
 
     public SinkDbConnectorConfig(){
     }
 
     @Override
     public void validate(KeyValue config) {
-        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0);
+        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
 
-        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
         if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
             this.taskDivideStrategy = new DivideTaskByQueue();
         } else {
@@ -43,6 +43,7 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{
         this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ);
         this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER);
         this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3);
+        this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
 
     }
 
@@ -81,11 +82,11 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{
         return this.refreshInterval;
     }
 
-    public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
+    public Map<String, Set<TaskTopicInfo>> getTopicRouteMap() {
         return topicRouteMap;
     }
 
-    public void setTopicRouteMap(Map<String, List<TaskTopicInfo>> topicRouteMap) {
+    public void setTopicRouteMap(Map<String, Set<TaskTopicInfo>> topicRouteMap) {
         this.topicRouteMap = topicRouteMap;
     }
 
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
index 801e411..4972739 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
@@ -18,9 +18,9 @@ public class SourceDbConnectorConfig extends DbConnectorConfig{
 
     @Override
     public void validate(KeyValue config) {
-        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0);
+        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
 
-        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
         if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
             this.taskDivideStrategy = new DivideTaskByQueue();
         } else {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index 0f818ee..6a41646 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -3,46 +3,48 @@ package org.apache.rocketmq.connect.jdbc.connector;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.sink.SinkConnector;
-import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.commons.lang3.text.StrSubstitutor;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.jdbc.common.CloneUtils;
 import org.apache.rocketmq.connect.jdbc.common.ConstDefine;
 import org.apache.rocketmq.connect.jdbc.common.Utils;
-import org.apache.rocketmq.connect.jdbc.config.*;
+import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.config.DataType;
+import org.apache.rocketmq.connect.jdbc.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.jdbc.config.SinkDbConnectorConfig;
+import org.apache.rocketmq.connect.jdbc.config.TaskDivideConfig;
+import org.apache.rocketmq.connect.jdbc.config.TaskTopicInfo;
 import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 
 public class JdbcSinkConnector extends SinkConnector {
     private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class);
     private DbConnectorConfig dbConnectorConfig;
     private volatile boolean configValid = false;
     private ScheduledExecutorService executor;
-    private Map<String, List<TaskTopicInfo>> topicRouteMap;
+    private HashMap<String, Set<TaskTopicInfo>> topicRouteMap;
 
     private DefaultMQAdminExt srcMQAdminExt;
 
     private volatile boolean adminStarted;
 
     public JdbcSinkConnector() {
-        topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
+        topicRouteMap = new HashMap<>();
         dbConnectorConfig = new SinkDbConnectorConfig();
         executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("JdbcSinkConnector-SinkWatcher-%d").daemon(true).build());
     }
@@ -93,30 +95,34 @@ public class JdbcSinkConnector extends SinkConnector {
 
     public void startListener() {
         executor.scheduleAtFixedRate(new Runnable() {
+            boolean first = true;
+            HashMap<String, Set<TaskTopicInfo>> origin = null;
+
             @Override
             public void run() {
-                Map<String, List<TaskTopicInfo>> origin = topicRouteMap;
-                topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
-
                 buildRoute();
-
+                if (first) {
+                    origin = CloneUtils.clone(topicRouteMap);
+                    first = false;
+                }
                 if (!compare(origin, topicRouteMap)) {
                     context.requestTaskReconfiguration();
+                    origin = CloneUtils.clone(topicRouteMap);
                 }
             }
         }, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS);
     }
 
-    public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
+    public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<TaskTopicInfo>> updated) {
         if (origin.size() != updated.size()) {
             return false;
         }
-        for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) {
+        for (Map.Entry<String, Set<TaskTopicInfo>> entry : origin.entrySet()) {
             if (!updated.containsKey(entry.getKey())) {
                 return false;
             }
-            List<TaskTopicInfo> originTasks = entry.getValue();
-            List<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+            Set<TaskTopicInfo> originTasks = entry.getValue();
+            Set<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
             if (originTasks.size() != updateTasks.size()) {
                 return false;
             }
@@ -145,7 +151,7 @@ public class JdbcSinkConnector extends SinkConnector {
 
                 TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
                 if (!topicRouteMap.containsKey(topic)) {
-                    topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>());
+                    topicRouteMap.put(topic, new HashSet<>(16));
                 }
                 for (QueueData qd : topicRouteData.getQueueDatas()) {
                     if (brokerNameSet.contains(qd.getBrokerName())) {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index d533395..f36623f 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -21,12 +21,15 @@ package org.apache.rocketmq.connect.jdbc.connector;
 import io.openmessaging.connector.api.source.SourceTask;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.rocketmq.connect.jdbc.common.ConstDefine;
 import org.apache.rocketmq.connect.jdbc.config.Config;
 import org.apache.rocketmq.connect.jdbc.common.DBUtils;
 import org.apache.rocketmq.connect.jdbc.config.ConfigUtil;
@@ -105,13 +108,13 @@ public class JdbcSourceTask extends SourceTask {
                         .entryType(EntryType.UPDATE);
                 for (int i = 0; i < dataRow.getColList().size(); i++) {
                     Object[] value = new Object[2];
-                    value[0] = value[1] = dataRow.getDataList().get(i);
+                    value[0] = value[1] = dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i));
                     dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSONObject.toJSONString(value));
                 }
 
                 SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                        ByteBuffer.wrap((config.getDbUrl() + config.getDbPort()).getBytes("UTF-8")),
-                        ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
+                        ByteBuffer.wrap((ConstDefine.PREFIX + config.getDbUrl() + config.getDbPort()).getBytes(StandardCharsets.UTF_8)),
+                        ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
                 res.add(sourceDataEntry);
                 log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry));
             }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
index e30c65f..9feffe6 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
@@ -49,7 +49,7 @@ public class Updater {
                     isSuccess = true;
                     // 再查原有数据是否存在,存在则删除
                     beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap);
-                    if (beforeUpdateId != 0){
+                    if (beforeUpdateId != 0 && afterUpdateId != beforeUpdateId){
                        isSuccess = deleteRow(dbName, tableName, beforeUpdateId);
                     }
                     break;
@@ -107,7 +107,7 @@ public class Updater {
         ResultSet rs;
         PreparedStatement stmt;
         Boolean finishQuery = false;
-        String query = "select id from " + dbName + "." + tableName + " where ";
+        String query = "select id from " + dbName + "." + tableName + " where 1=1";
 
         for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
             count ++;
@@ -116,7 +116,7 @@ public class Updater {
             Object fieldValue = entry.getValue()[0];
             if ("id".equals(fieldName))
                 continue;
-            if (count != 1) {
+            if (count <=fieldMap.size()) {
                 query += " and ";
             }
             if (fieldValue == null)
@@ -150,7 +150,7 @@ public class Updater {
         ResultSet rs;
         PreparedStatement stmt;
         Boolean finishQuery = false;
-        String query = "select id from " + dbName + "." + tableName + " where ";
+        String query = "select id from " + dbName + "." + tableName + " where 1=1";
 
         for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
             count ++;
@@ -159,7 +159,7 @@ public class Updater {
             Object fieldValue = entry.getValue()[1];
             if ("id".equals(fieldName))
                 continue;
-            if (count != 1) {
+            if (count <=fieldMap.size()) {
                 query += " and ";
             }
             if (fieldValue == null)
@@ -200,19 +200,21 @@ public class Updater {
             FieldType fieldType = entry.getKey().getType();
             Object fieldValue = entry.getValue()[1];
             if ("id".equals(fieldName)) {
-                if (id == 0)
+                if (id == 0){
+                    if(count==fieldMap.size()) update = update.substring(0,update.length()-1);
                     continue;
-                else
+                }else{
                     fieldValue = id;
-            }
-            if (count != 1) {
-                update += ", ";
+                }
             }
             if (fieldValue == null) {
                 update += fieldName + " = NULL";
             } else {
                 update = typeParser(fieldType, fieldName, fieldValue, update);
             }
+            if(count<fieldMap.size()){
+                update += ",";
+            }
         }
 
         try {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index d2544f9..03447a8 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -92,12 +92,12 @@ public class Querier {
     public void poll()  {
         try {
             PreparedStatement stmt;
-            StringBuilder query = new StringBuilder("select * from ");
             LinkedList<Table> tableLinkedList = new LinkedList<>();
             for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) {
                 String db = entry.getKey();
                 Iterator<Map.Entry<String, Table>> iterator = entry.getValue().getTableMap().entrySet().iterator();
                 while (iterator.hasNext()) {
+                    StringBuilder query = new StringBuilder("select * from ");
                     Map.Entry<String, Table> tableEntry = iterator.next();
                     String tb = tableEntry.getKey();
                     query.append(db + "." + tb);
@@ -116,7 +116,7 @@ public class Querier {
                             query.append(condition);
                         }
                     }
-                    stmt = connection.prepareStatement(query + db + "." + tb);
+                    stmt = connection.prepareStatement(query.toString());
                     ResultSet rs;
                     rs = stmt.executeQuery();
                     List<String> colList = tableEntry.getValue().getColList();
@@ -158,7 +158,7 @@ public class Querier {
                 for (String whiteTableName : whiteTableObject.keySet()){
                     Collections.addAll(whiteTableSet, whiteTableName);
                     HashMap<String, String> filterMap = new HashMap<>();
-                    JSONObject tableFilterObject = (JSONObject)whiteTableObject.get(whiteTableName);
+                    JSONObject tableFilterObject = JSONObject.parseObject(whiteTableObject.get(whiteTableName).toString());
                     for(String filterKey : tableFilterObject.keySet()){
                         filterMap.put(filterKey, tableFilterObject.getString(filterKey));
                     }
@@ -170,5 +170,4 @@ public class Querier {
         schema.load();
         log.info("load schema success");
     }
-
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
index 964322d..0ab72df 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
@@ -288,9 +288,9 @@ public class TimestampIncrementingQuerier extends Querier {
         incrementingColumn = config.getIncrementingColumnName();
         map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
         map.put("url",
-                "jdbc:mysql://" + config.getJdbcUrl() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
-        map.put("username", config.getJdbcUsername());
-        map.put("password", config.getJdbcPassword());
+                "jdbc:mysql://" + config.getDbUrl() + ":" + config.getDbPort() +"?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+        map.put("username", config.getDbUsername());
+        map.put("password", config.getDbPassword());
         map.put("initialSize", "2");
         map.put("maxActive", "2");
         map.put("maxWait", "60000");
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
index 797710a..9d23fd2 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.jdbc.strategy;
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
+import java.util.Set;
 import org.apache.rocketmq.connect.jdbc.config.*;
 
 import java.util.ArrayList;
@@ -41,7 +42,7 @@ public class DivideTaskByQueue extends TaskDivideStrategy {
         List<KeyValue> config = new ArrayList<KeyValue>();
         int parallelism = tdc.getTaskParallelism();
         Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
-        Map<String, List<TaskTopicInfo>> topicRouteMap = ((SinkDbConnectorConfig)dbConnectorConfig).getTopicRouteMap();
+        Map<String, Set<TaskTopicInfo>> topicRouteMap = ((SinkDbConnectorConfig)dbConnectorConfig).getTopicRouteMap();
         int id = -1;
         for (String t : topicRouteMap.keySet()) {
             for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
index 762c7a0..c1d5020 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
@@ -52,7 +52,11 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
             String filter = entry.getValue();
             Map<String, String> tableMap = new HashMap<>();
             tableMap.put(tableKey, filter);
-            taskTopicList.get(ind).put(dbKey, tableMap);
+            if(!taskTopicList.get(ind).containsKey(dbKey)){
+                taskTopicList.get(ind).put(dbKey, tableMap);
+            }else {
+                taskTopicList.get(ind).get(dbKey).putAll(tableMap);
+            }
         }
 
         for (int i = 0; i < parallelism; i++) {
@@ -77,11 +81,13 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
         int parallelism = tdc.getTaskParallelism();
         int id = -1;
         Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
-        Map<Integer, String> taskTopicList = new HashMap<>();
+        Map<Integer, StringBuilder> taskTopicList = new HashMap<>();
         for (String topicName : topicRouteSet) {
             int ind = ++id % parallelism;
             if (!taskTopicList.containsKey(ind)) {
-                taskTopicList.put(ind, topicName);
+                taskTopicList.put(ind, new StringBuilder(topicName));
+            }else {
+                taskTopicList.get(ind).append(",").append(topicName);
             }
         }
 
@@ -91,7 +97,7 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
             keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
             keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
             keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
-            keyValue.put(Config.CONN_TOPIC_NAMES, JSONObject.toJSONString(taskTopicList.get(i)));
+            keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i).toString());
             keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
             keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
             keyValue.put(Config.CONN_DB_MODE, tdc.getMode());