You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:48 UTC

[rocketmq-connect] 40/43: [ISSUE #570] ASoC connect runtime optimization: CLI (#622)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit b425260296d1524d61643b3e286d395df1ed5835
Author: Dreaouth <jl...@163.com>
AuthorDate: Sun Sep 20 19:09:29 2020 +0800

    [ISSUE #570] ASoC connect runtime optimization: CLI (#622)
    
    feature(rocketmq-runtime) add CLI support for rocketmq-connect-runtime
    
    * Add CLI
    
    * Fix checkstyle
    
    * Optimize CLI structure
    
    * Add README.md
    
    * Rename CLI
    
    * Update pom.xml
    
    * Optimize the connectors and tasks format
    
    * Fix newline format
---
 .../connect/jdbc/connector/JdbcSourceConnector.java     |  2 +-
 .../connect/jdbc/strategy/DivideTaskByTopic.java        | 17 ++++++++++-------
 2 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index a083e84..ee62133 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -59,7 +59,7 @@ public class JdbcSourceConnector extends SourceConnector {
 
     @Override
     public void start() {
-
+        log.info("JdbcSourceConnector start");
     }
 
     @Override
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
index c1d5020..5762795 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
@@ -41,21 +41,23 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
         int parallelism = tdc.getTaskParallelism();
         int id = -1;
         Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
-        Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
+        Map<Integer, String> taskTopicList = new HashMap<>();
+        Map<Integer, Map<String, Map<String, String>>> taskWhiteList = new HashMap<>();
         for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
             int ind = ++id % parallelism;
-            if (!taskTopicList.containsKey(ind)) {
-                taskTopicList.put(ind, new HashMap<>());
+            if (!taskWhiteList.containsKey(ind)) {
+                taskWhiteList.put(ind, new HashMap<>());
             }
             String dbKey = entry.getKey().split("-")[0];
             String tableKey = entry.getKey().split("-")[1];
+            taskTopicList.put(ind, tableKey);
             String filter = entry.getValue();
             Map<String, String> tableMap = new HashMap<>();
             tableMap.put(tableKey, filter);
-            if(!taskTopicList.get(ind).containsKey(dbKey)){
-                taskTopicList.get(ind).put(dbKey, tableMap);
+            if(!taskWhiteList.get(ind).containsKey(dbKey)){
+                taskWhiteList.get(ind).put(dbKey, tableMap);
             }else {
-                taskTopicList.get(ind).get(dbKey).putAll(tableMap);
+                taskWhiteList.get(ind).get(dbKey).putAll(tableMap);
             }
         }
 
@@ -66,7 +68,8 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
             keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
             keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
             keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
-            keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
+            keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskWhiteList.get(i)));
+            keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i));
             keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
             keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
             keyValue.put(Config.CONN_DB_MODE, tdc.getMode());