You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:44 UTC
[rocketmq-connect] 36/43: [ISSUE #545]bug fix (#546)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 0b56e472982adb92ba7fa52bf2d3c9b5f326e969
Author: mike_xwm <mi...@126.com>
AuthorDate: Wed Apr 1 13:54:08 2020 +0800
[ISSUE #545]bug fix (#546)
Co-authored-by: MajorHe1 <he...@gmail.com>
---
pom.xml | 2 +
.../rocketmq/connect/jdbc/common/CloneUtils.java | 28 +++++++++++
.../rocketmq/connect/jdbc/common/ConstDefine.java | 2 +-
.../rocketmq/connect/jdbc/common/DBUtils.java | 2 +-
.../rocketmq/connect/jdbc/config/Config.java | 31 ------------
.../connect/jdbc/config/SinkDbConnectorConfig.java | 11 +++--
.../jdbc/config/SourceDbConnectorConfig.java | 4 +-
.../connect/jdbc/connector/JdbcSinkConnector.java | 56 ++++++++++++----------
.../connect/jdbc/connector/JdbcSourceTask.java | 9 ++--
.../apache/rocketmq/connect/jdbc/sink/Updater.java | 22 +++++----
.../rocketmq/connect/jdbc/source/Querier.java | 7 ++-
.../jdbc/source/TimestampIncrementingQuerier.java | 6 +--
.../connect/jdbc/strategy/DivideTaskByQueue.java | 3 +-
.../connect/jdbc/strategy/DivideTaskByTopic.java | 14 ++++--
14 files changed, 107 insertions(+), 90 deletions(-)
diff --git a/pom.xml b/pom.xml
index 9df23c4..61680f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,7 @@
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>0.1.1</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
@@ -264,6 +265,7 @@
<artifactId>druid</artifactId>
<version>1.0.31</version>
</dependency>
+
</dependencies>
</project>
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/CloneUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/CloneUtils.java
new file mode 100644
index 0000000..f0ff98e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/CloneUtils.java
@@ -0,0 +1,28 @@
+package org.apache.rocketmq.connect.jdbc.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class CloneUtils {
+ @SuppressWarnings("unchecked")
+ public static <T extends Serializable> T clone(T obj) {
+ T clonedObj = null;
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(obj);
+ oos.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ clonedObj = (T) ois.readObject();
+ ois.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return clonedObj;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
index f49d367..e6d2f7a 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java
@@ -19,5 +19,5 @@ package org.apache.rocketmq.connect.jdbc.common;
public class ConstDefine {
public static String JDBC_CONNECTOR_ADMIN_PREFIX = "JDBC-CONNECTOR-ADMIN";
-
+ public static final String PREFIX = "jdbc";
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
index ab58153..31a86d1 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java
@@ -196,7 +196,7 @@ public class DBUtils {
map.put("username", config.getDbUsername());
map.put("password", config.getDbPassword());
map.put("initialSize", "1");
- map.put("maxActive", "1");
+ map.put("maxActive", "2");
map.put("maxWait", "60000");
map.put("timeBetweenEvictionRunsMillis", "60000");
map.put("minEvictableIdleTimeMillis", "300000");
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
index cca1aa5..9162bad 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
@@ -78,9 +78,6 @@ public class Config {
private long timestampDelayInterval = 0;
private String dbTimezone = "GMT+8";
private String queueName;
- private String jdbcUrl;
- private String jdbcUsername;
- private String jdbcPassword;
private Logger log = LoggerFactory.getLogger(Config.class);
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
@@ -327,32 +324,4 @@ public class Config {
public void setWhiteTable(String whiteTable) {
this.whiteTable = whiteTable;
}
-
- public void setPollInterval(long pollInterval) {
- this.pollInterval = pollInterval;
- }
-
- public String getJdbcUrl() {
- return jdbcUrl;
- }
-
- public void setJdbcUrl(String jdbcUrl) {
- this.jdbcUrl = jdbcUrl;
- }
-
- public String getJdbcUsername() {
- return jdbcUsername;
- }
-
- public void setJdbcUsername(String jdbcUsername) {
- this.jdbcUsername = jdbcUsername;
- }
-
- public String getJdbcPassword() {
- return jdbcPassword;
- }
-
- public void setJdbcPassword(String jdbcPassword) {
- this.jdbcPassword = jdbcPassword;
- }
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
index 3ff4f71..26b1541 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
@@ -16,16 +16,16 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{
private String srcNamesrvs;
private String srcCluster;
private long refreshInterval;
- private Map<String, List<TaskTopicInfo>> topicRouteMap;
+ private Map<String, Set<TaskTopicInfo>> topicRouteMap;
public SinkDbConnectorConfig(){
}
@Override
public void validate(KeyValue config) {
- this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0);
+ this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
- int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+ int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
this.taskDivideStrategy = new DivideTaskByQueue();
} else {
@@ -43,6 +43,7 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{
this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ);
this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER);
this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3);
+ this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
}
@@ -81,11 +82,11 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{
return this.refreshInterval;
}
- public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
+ public Map<String, Set<TaskTopicInfo>> getTopicRouteMap() {
return topicRouteMap;
}
- public void setTopicRouteMap(Map<String, List<TaskTopicInfo>> topicRouteMap) {
+ public void setTopicRouteMap(Map<String, Set<TaskTopicInfo>> topicRouteMap) {
this.topicRouteMap = topicRouteMap;
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
index 801e411..4972739 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
@@ -18,9 +18,9 @@ public class SourceDbConnectorConfig extends DbConnectorConfig{
@Override
public void validate(KeyValue config) {
- this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0);
+ this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
- int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+ int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
this.taskDivideStrategy = new DivideTaskByQueue();
} else {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index 0f818ee..6a41646 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -3,46 +3,48 @@ package org.apache.rocketmq.connect.jdbc.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.sink.SinkConnector;
-import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.commons.lang3.text.StrSubstitutor;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.jdbc.common.CloneUtils;
import org.apache.rocketmq.connect.jdbc.common.ConstDefine;
import org.apache.rocketmq.connect.jdbc.common.Utils;
-import org.apache.rocketmq.connect.jdbc.config.*;
+import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.config.DataType;
+import org.apache.rocketmq.connect.jdbc.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.jdbc.config.SinkDbConnectorConfig;
+import org.apache.rocketmq.connect.jdbc.config.TaskDivideConfig;
+import org.apache.rocketmq.connect.jdbc.config.TaskTopicInfo;
import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
public class JdbcSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class);
private DbConnectorConfig dbConnectorConfig;
private volatile boolean configValid = false;
private ScheduledExecutorService executor;
- private Map<String, List<TaskTopicInfo>> topicRouteMap;
+ private HashMap<String, Set<TaskTopicInfo>> topicRouteMap;
private DefaultMQAdminExt srcMQAdminExt;
private volatile boolean adminStarted;
public JdbcSinkConnector() {
- topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
+ topicRouteMap = new HashMap<>();
dbConnectorConfig = new SinkDbConnectorConfig();
executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("JdbcSinkConnector-SinkWatcher-%d").daemon(true).build());
}
@@ -93,30 +95,34 @@ public class JdbcSinkConnector extends SinkConnector {
public void startListener() {
executor.scheduleAtFixedRate(new Runnable() {
+ boolean first = true;
+ HashMap<String, Set<TaskTopicInfo>> origin = null;
+
@Override
public void run() {
- Map<String, List<TaskTopicInfo>> origin = topicRouteMap;
- topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
-
buildRoute();
-
+ if (first) {
+ origin = CloneUtils.clone(topicRouteMap);
+ first = false;
+ }
if (!compare(origin, topicRouteMap)) {
context.requestTaskReconfiguration();
+ origin = CloneUtils.clone(topicRouteMap);
}
}
}, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS);
}
- public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
+ public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<TaskTopicInfo>> updated) {
if (origin.size() != updated.size()) {
return false;
}
- for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) {
+ for (Map.Entry<String, Set<TaskTopicInfo>> entry : origin.entrySet()) {
if (!updated.containsKey(entry.getKey())) {
return false;
}
- List<TaskTopicInfo> originTasks = entry.getValue();
- List<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+ Set<TaskTopicInfo> originTasks = entry.getValue();
+ Set<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
if (originTasks.size() != updateTasks.size()) {
return false;
}
@@ -145,7 +151,7 @@ public class JdbcSinkConnector extends SinkConnector {
TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
if (!topicRouteMap.containsKey(topic)) {
- topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>());
+ topicRouteMap.put(topic, new HashSet<>(16));
}
for (QueueData qd : topicRouteData.getQueueDatas()) {
if (brokerNameSet.contains(qd.getBrokerName())) {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index d533395..f36623f 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -21,12 +21,15 @@ package org.apache.rocketmq.connect.jdbc.connector;
import io.openmessaging.connector.api.source.SourceTask;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.connect.jdbc.common.ConstDefine;
import org.apache.rocketmq.connect.jdbc.config.Config;
import org.apache.rocketmq.connect.jdbc.common.DBUtils;
import org.apache.rocketmq.connect.jdbc.config.ConfigUtil;
@@ -105,13 +108,13 @@ public class JdbcSourceTask extends SourceTask {
.entryType(EntryType.UPDATE);
for (int i = 0; i < dataRow.getColList().size(); i++) {
Object[] value = new Object[2];
- value[0] = value[1] = dataRow.getDataList().get(i);
+ value[0] = value[1] = dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i));
dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSONObject.toJSONString(value));
}
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap((config.getDbUrl() + config.getDbPort()).getBytes("UTF-8")),
- ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
+ ByteBuffer.wrap((ConstDefine.PREFIX + config.getDbUrl() + config.getDbPort()).getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
res.add(sourceDataEntry);
log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry));
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
index e30c65f..9feffe6 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
@@ -49,7 +49,7 @@ public class Updater {
isSuccess = true;
// 再查原有数据是否存在,存在则删除
beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap);
- if (beforeUpdateId != 0){
+ if (beforeUpdateId != 0 && afterUpdateId != beforeUpdateId){
isSuccess = deleteRow(dbName, tableName, beforeUpdateId);
}
break;
@@ -107,7 +107,7 @@ public class Updater {
ResultSet rs;
PreparedStatement stmt;
Boolean finishQuery = false;
- String query = "select id from " + dbName + "." + tableName + " where ";
+ String query = "select id from " + dbName + "." + tableName + " where 1=1";
for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
count ++;
@@ -116,7 +116,7 @@ public class Updater {
Object fieldValue = entry.getValue()[0];
if ("id".equals(fieldName))
continue;
- if (count != 1) {
+ if (count <=fieldMap.size()) {
query += " and ";
}
if (fieldValue == null)
@@ -150,7 +150,7 @@ public class Updater {
ResultSet rs;
PreparedStatement stmt;
Boolean finishQuery = false;
- String query = "select id from " + dbName + "." + tableName + " where ";
+ String query = "select id from " + dbName + "." + tableName + " where 1=1";
for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
count ++;
@@ -159,7 +159,7 @@ public class Updater {
Object fieldValue = entry.getValue()[1];
if ("id".equals(fieldName))
continue;
- if (count != 1) {
+ if (count <=fieldMap.size()) {
query += " and ";
}
if (fieldValue == null)
@@ -200,19 +200,21 @@ public class Updater {
FieldType fieldType = entry.getKey().getType();
Object fieldValue = entry.getValue()[1];
if ("id".equals(fieldName)) {
- if (id == 0)
+ if (id == 0){
+ if(count==fieldMap.size()) update = update.substring(0,update.length()-1);
continue;
- else
+ }else{
fieldValue = id;
- }
- if (count != 1) {
- update += ", ";
+ }
}
if (fieldValue == null) {
update += fieldName + " = NULL";
} else {
update = typeParser(fieldType, fieldName, fieldValue, update);
}
+ if(count<fieldMap.size()){
+ update += ",";
+ }
}
try {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index d2544f9..03447a8 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -92,12 +92,12 @@ public class Querier {
public void poll() {
try {
PreparedStatement stmt;
- StringBuilder query = new StringBuilder("select * from ");
LinkedList<Table> tableLinkedList = new LinkedList<>();
for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) {
String db = entry.getKey();
Iterator<Map.Entry<String, Table>> iterator = entry.getValue().getTableMap().entrySet().iterator();
while (iterator.hasNext()) {
+ StringBuilder query = new StringBuilder("select * from ");
Map.Entry<String, Table> tableEntry = iterator.next();
String tb = tableEntry.getKey();
query.append(db + "." + tb);
@@ -116,7 +116,7 @@ public class Querier {
query.append(condition);
}
}
- stmt = connection.prepareStatement(query + db + "." + tb);
+ stmt = connection.prepareStatement(query.toString());
ResultSet rs;
rs = stmt.executeQuery();
List<String> colList = tableEntry.getValue().getColList();
@@ -158,7 +158,7 @@ public class Querier {
for (String whiteTableName : whiteTableObject.keySet()){
Collections.addAll(whiteTableSet, whiteTableName);
HashMap<String, String> filterMap = new HashMap<>();
- JSONObject tableFilterObject = (JSONObject)whiteTableObject.get(whiteTableName);
+ JSONObject tableFilterObject = JSONObject.parseObject(whiteTableObject.get(whiteTableName).toString());
for(String filterKey : tableFilterObject.keySet()){
filterMap.put(filterKey, tableFilterObject.getString(filterKey));
}
@@ -170,5 +170,4 @@ public class Querier {
schema.load();
log.info("load schema success");
}
-
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
index 964322d..0ab72df 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
@@ -288,9 +288,9 @@ public class TimestampIncrementingQuerier extends Querier {
incrementingColumn = config.getIncrementingColumnName();
map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
map.put("url",
- "jdbc:mysql://" + config.getJdbcUrl() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
- map.put("username", config.getJdbcUsername());
- map.put("password", config.getJdbcPassword());
+ "jdbc:mysql://" + config.getDbUrl() + ":" + config.getDbPort() +"?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+ map.put("username", config.getDbUsername());
+ map.put("password", config.getDbPassword());
map.put("initialSize", "2");
map.put("maxActive", "2");
map.put("maxWait", "60000");
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
index 797710a..9d23fd2 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.jdbc.strategy;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.internal.DefaultKeyValue;
+import java.util.Set;
import org.apache.rocketmq.connect.jdbc.config.*;
import java.util.ArrayList;
@@ -41,7 +42,7 @@ public class DivideTaskByQueue extends TaskDivideStrategy {
List<KeyValue> config = new ArrayList<KeyValue>();
int parallelism = tdc.getTaskParallelism();
Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
- Map<String, List<TaskTopicInfo>> topicRouteMap = ((SinkDbConnectorConfig)dbConnectorConfig).getTopicRouteMap();
+ Map<String, Set<TaskTopicInfo>> topicRouteMap = ((SinkDbConnectorConfig)dbConnectorConfig).getTopicRouteMap();
int id = -1;
for (String t : topicRouteMap.keySet()) {
for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
index 762c7a0..c1d5020 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
@@ -52,7 +52,11 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
String filter = entry.getValue();
Map<String, String> tableMap = new HashMap<>();
tableMap.put(tableKey, filter);
- taskTopicList.get(ind).put(dbKey, tableMap);
+ if(!taskTopicList.get(ind).containsKey(dbKey)){
+ taskTopicList.get(ind).put(dbKey, tableMap);
+ }else {
+ taskTopicList.get(ind).get(dbKey).putAll(tableMap);
+ }
}
for (int i = 0; i < parallelism; i++) {
@@ -77,11 +81,13 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
int parallelism = tdc.getTaskParallelism();
int id = -1;
Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
- Map<Integer, String> taskTopicList = new HashMap<>();
+ Map<Integer, StringBuilder> taskTopicList = new HashMap<>();
for (String topicName : topicRouteSet) {
int ind = ++id % parallelism;
if (!taskTopicList.containsKey(ind)) {
- taskTopicList.put(ind, topicName);
+ taskTopicList.put(ind, new StringBuilder(topicName));
+ }else {
+ taskTopicList.get(ind).append(",").append(topicName);
}
}
@@ -91,7 +97,7 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
- keyValue.put(Config.CONN_TOPIC_NAMES, JSONObject.toJSONString(taskTopicList.get(i)));
+ keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i).toString());
keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
keyValue.put(Config.CONN_DB_MODE, tdc.getMode());