You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:48 UTC
[rocketmq-connect] 40/43: [ISSUE #570] ASoC connect runtime optimization: CLI (#622)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit b425260296d1524d61643b3e286d395df1ed5835
Author: Dreaouth <jl...@163.com>
AuthorDate: Sun Sep 20 19:09:29 2020 +0800
[ISSUE #570] ASoC connect runtime optimization: CLI (#622)
feature(rocketmq-runtime) add CLI support for rocketmq-connect-runtime
* Add CLI
* Fix checkstyle
* Optimize CLI structure
* Add README.md
* Rename CLI
* Update pom.xml
* Optimize the connectors and tasks format
* Fix newline format
---
.../connect/jdbc/connector/JdbcSourceConnector.java | 2 +-
.../connect/jdbc/strategy/DivideTaskByTopic.java | 17 ++++++++++-------
2 files changed, 11 insertions(+), 8 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index a083e84..ee62133 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -59,7 +59,7 @@ public class JdbcSourceConnector extends SourceConnector {
@Override
public void start() {
-
+ log.info("JdbcSourceConnector start");
}
@Override
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
index c1d5020..5762795 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
@@ -41,21 +41,23 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
int parallelism = tdc.getTaskParallelism();
int id = -1;
Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
- Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
+ Map<Integer, String> taskTopicList = new HashMap<>();
+ Map<Integer, Map<String, Map<String, String>>> taskWhiteList = new HashMap<>();
for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
int ind = ++id % parallelism;
- if (!taskTopicList.containsKey(ind)) {
- taskTopicList.put(ind, new HashMap<>());
+ if (!taskWhiteList.containsKey(ind)) {
+ taskWhiteList.put(ind, new HashMap<>());
}
String dbKey = entry.getKey().split("-")[0];
String tableKey = entry.getKey().split("-")[1];
+ taskTopicList.put(ind, tableKey);
String filter = entry.getValue();
Map<String, String> tableMap = new HashMap<>();
tableMap.put(tableKey, filter);
- if(!taskTopicList.get(ind).containsKey(dbKey)){
- taskTopicList.get(ind).put(dbKey, tableMap);
+ if(!taskWhiteList.get(ind).containsKey(dbKey)){
+ taskWhiteList.get(ind).put(dbKey, tableMap);
}else {
- taskTopicList.get(ind).get(dbKey).putAll(tableMap);
+ taskWhiteList.get(ind).get(dbKey).putAll(tableMap);
}
}
@@ -66,7 +68,8 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
- keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
+ keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskWhiteList.get(i)));
+ keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i));
keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
keyValue.put(Config.CONN_DB_MODE, tdc.getMode());