You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/22 08:17:46 UTC

[GitHub] [rocketmq-connect] Slideee opened a new pull request, #117: [ISSUE #116] replicator adapt to the new connect api

Slideee opened a new pull request, #117:
URL: https://github.com/apache/rocketmq-connect/pull/117

   ## What is the purpose of the change
   
   (https://github.com/apache/rocketmq-connect/issues/116)
   
   ## Brief changelog
   
   replicator adapt to the new connect api
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq-connect/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857081307


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -164,6 +166,11 @@ public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<T
         return true;
     }
 
+
+    @Override public void init(KeyValue config) {
+

Review Comment:
   > config should be init by init
   
   For the Replicator, the validate method is already config initialized. So it's not initialized in the init method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r867315636


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   > There is no problem with multiple topics in one task. The main problem is that the configuration of the number of tasks can be unified. Basically, all connectors need this configuration.
   
   I tried to submit a version, can you review it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857079826


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   For replicators, taskParallelism is used to control the number of tasks. The taskParallelism field and maxTask are semantically identical, so there is no definition for maxTask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r860515988


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   The limitation of the previous version of the connect api, if the replicator wants to provide the ability of multiple tasks, only the replicator itself can provide the number of tasks, but we found that the ability of multiple tasks is general, and the replicator should not implement its own definition configuration, almost every specific connector implementation There is such a demand, so the specified configuration of the number of tasks should be unified. It is easier to use the maxTasks provided by the api to unify this capability. The connector shards the tasks according to the specific storage sharding. The actual number of task shards should be, For example, by topic, min (the number of tasks divided by topic, maxTasks) is configured.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857080985


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java:
##########
@@ -117,38 +128,19 @@ public void start(KeyValue config) {
             }
 
             for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable : stats.getOffsetTable().entrySet()) {
-
                 MessageQueue mq = offsetTable.getKey();
                 long srcOffset = offsetTable.getValue().getConsumerOffset();
                 long targetOffset = this.store.convertTargetOffset(mq, group, srcOffset);
 
+                List<Field> fields = new ArrayList<Field>();
+                Schema schema = new Schema(SchemaEnum.OFFSET.name(), FieldType.STRING, fields);
+                schema.getFields().add(new Field(0, FieldName.OFFSET.getKey(), SchemaBuilder.string().build()));

Review Comment:
   > offset use INT64 may be better
   
   done



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -87,20 +88,25 @@ public RmqMetaReplicator() {
         executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
     }
 
-    @Override public String verifyAndSetConfig(KeyValue config) {
+    @Override public void validate(KeyValue config) {
         log.info("verifyAndSetConfig...");
         try {
             replicatorConfig.validate(config);
+            this.configValid = true;
         } catch (IllegalArgumentException e) {

Review Comment:
   > I think the exception can be thrown upward, because there is no return, the upper layer cannot determine whether the verification is passed, otherwise the exception will be replenished, and there is no exception log, it is difficult to diagnose the problem
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r867314361


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   There is no problem with multiple topics in one task. The main problem is that the configuration of the number of tasks can be unified. Basically, all connectors need this configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857079900


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   > maxTasks can specify the number of tasks. Can you consider whether it needs to be divided into multiple tasks?
   
   For replicators, taskParallelism is used to control the number of tasks. The taskParallelism field and maxTask are semantically identical, so there is no definition for maxTask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r867663143


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java:
##########
@@ -25,7 +25,7 @@
 import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
 
 public class RmqConnectorConfig {
-
+    @Deprecated
     private int taskParallelism;

Review Comment:
   Is it possible to delete this property directly?



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java:
##########
@@ -48,7 +48,7 @@ public class RmqConnectorConfig {
     public RmqConnectorConfig() {
     }
 
-    public void validate(KeyValue config) {
+    public void init(KeyValue config) {
         this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM, 1);

Review Comment:
   Is it possible to delete this property directly?



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java:
##########
@@ -28,6 +28,7 @@ public class TaskDivideConfig {
 
     private int dataType;
 
+    @Deprecated
     private int taskParallelism;

Review Comment:
   Is it possible to delete this property directly?



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -139,7 +153,7 @@ public List<KeyValue> taskConfigs() {
             e.printStackTrace();
         }
 
-        return Utils.groupPartitions(new ArrayList<>(this.knownGroups), this.replicatorConfig.getTaskParallelism(), replicatorConfig);
+        return Utils.groupPartitions(new ArrayList<>(this.knownGroups), this.replicatorConfig.getTaskParallelism(), replicatorConfig, maxTasks);

Review Comment:
   Is it possible to delete  taskParallelism directly?



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java:
##########
@@ -31,10 +31,11 @@
 
 public class DivideTaskByQueue extends TaskDivideStrategy {
 
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc, int maxTasks) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
-        int parallelism = tdc.getTaskParallelism();
+        int parallelism = Math.min(tdc.getTaskParallelism(), maxTasks);

Review Comment:
   Is it possible to delete taskParallelism directly?



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java:
##########
@@ -32,10 +32,11 @@
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public class DivideTaskByConsistentHash extends TaskDivideStrategy {
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc, int maxTasks) {
 
         List<KeyValue> config = new ArrayList<>();
-        int parallelism = tdc.getTaskParallelism();
+        int parallelism = Math.min(tdc.getTaskParallelism(), maxTasks);

Review Comment:
   Is it possible to delete taskParallelism directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r867674521


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java:
##########
@@ -32,10 +32,11 @@
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public class DivideTaskByConsistentHash extends TaskDivideStrategy {
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc, int maxTasks) {
 
         List<KeyValue> config = new ArrayList<>();
-        int parallelism = tdc.getTaskParallelism();
+        int parallelism = Math.min(tdc.getTaskParallelism(), maxTasks);

Review Comment:
   > Is it possible to delete taskParallelism directly?
   
   if divide by hash or divide by queue or divide by group,directly use the maxTasks parameter to control the number of tasks?
   
   or
   
   if divide by hash , int parallelism = maxTasks
   if divide by group , int parallelism = min(groupNums, maxTasks)
   if divide by queue , int parallelism = min(queueNums, maxTasks)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r867377167


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java:
##########
@@ -28,10 +28,12 @@
 
 public class DivideTaskByTopic extends TaskDivideStrategy {
 
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc,
+        int maxTasks) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
-        int parallelism = tdc.getTaskParallelism();
+        int parallelism = Math.min(tdc.getTaskParallelism(), maxTasks);

Review Comment:
   > ```
   >  1、may be ConfigDefine.CONN_TASK_PARALLELISM  should  deprecate
   > ```
   > 
   > this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM, 1); 2、if devide by topic int parallelism = Math.min(topicNum, maxTasks);
   
   as you mean 
   if divide by queue, int parallelism = Math.min(queueNum, maxTasks) 
   if divide by group, int parallelism = Math.min(groupNum, maxTasks) 
   Is that right?
   then the number of hash nodes is still compared with taskParallelism and maxTasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou merged pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
odbozhou merged PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857081286


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -89,26 +90,27 @@ private synchronized void startMQAdminTools() throws MQClientException {
     }
 
     @Override
-    public String verifyAndSetConfig(KeyValue config) {
+    public void validate(KeyValue config) {
 
         // Check the need key.
         for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                return;
             }
         }
 
         try {
             this.replicatorConfig.validate(config);
         } catch (IllegalArgumentException e) {
-            return e.getMessage();
+            return;

Review Comment:
   > Exception should be throw
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857070808


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java:
##########
@@ -117,38 +128,19 @@ public void start(KeyValue config) {
             }
 
             for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable : stats.getOffsetTable().entrySet()) {
-
                 MessageQueue mq = offsetTable.getKey();
                 long srcOffset = offsetTable.getValue().getConsumerOffset();
                 long targetOffset = this.store.convertTargetOffset(mq, group, srcOffset);
 
+                List<Field> fields = new ArrayList<Field>();
+                Schema schema = new Schema(SchemaEnum.OFFSET.name(), FieldType.STRING, fields);
+                schema.getFields().add(new Field(0, FieldName.OFFSET.getKey(), SchemaBuilder.string().build()));

Review Comment:
   offset use INT64  may be better



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -89,26 +90,27 @@ private synchronized void startMQAdminTools() throws MQClientException {
     }
 
     @Override
-    public String verifyAndSetConfig(KeyValue config) {
+    public void validate(KeyValue config) {
 
         // Check the need key.
         for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                return;

Review Comment:
   Missing configuration should throw an exception instead of returning directly. Returning directly indicates that the verification has passed



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -87,20 +88,25 @@ public RmqMetaReplicator() {
         executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
     }
 
-    @Override public String verifyAndSetConfig(KeyValue config) {
+    @Override public void validate(KeyValue config) {
         log.info("verifyAndSetConfig...");
         try {
             replicatorConfig.validate(config);
+            this.configValid = true;
         } catch (IllegalArgumentException e) {
-            return e.getMessage();
+            return;
         }
         this.prepare();
-        this.configValid = true;
-        return "";
+        return;
+    }
+
+    @Override public void init(KeyValue config) {
+

Review Comment:
   config should be set by init 



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -87,20 +88,25 @@ public RmqMetaReplicator() {
         executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
     }
 
-    @Override public String verifyAndSetConfig(KeyValue config) {
+    @Override public void validate(KeyValue config) {
         log.info("verifyAndSetConfig...");
         try {
             replicatorConfig.validate(config);
+            this.configValid = true;
         } catch (IllegalArgumentException e) {

Review Comment:
   I think the exception can be thrown upward, because there is no return, the upper layer cannot determine whether the verification is passed, otherwise the exception will be replenished, and there is no exception log, it is difficult to diagnose the problem



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -164,6 +166,11 @@ public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<T
         return true;
     }
 
+
+    @Override public void init(KeyValue config) {
+

Review Comment:
   config should be init by init



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   maxTasks can specify the number of tasks. Can you consider whether it needs to be divided into multiple tasks?



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -89,26 +90,27 @@ private synchronized void startMQAdminTools() throws MQClientException {
     }
 
     @Override
-    public String verifyAndSetConfig(KeyValue config) {
+    public void validate(KeyValue config) {
 
         // Check the need key.
         for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                return;
             }
         }
 
         try {
             this.replicatorConfig.validate(config);
         } catch (IllegalArgumentException e) {
-            return e.getMessage();
+            return;

Review Comment:
   Exception should be throw



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -117,10 +119,10 @@ public void start() {
         }
 
         buildRoute();
-        startListner();
+        startListener();

Review Comment:
   startListener may not be needed, because the runtime has already done the load balancing logic, and put the load balancing queue in the context of each task



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r861482464


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   > 
   
   1、Yes,maxTask should replace taskParallelism.
   2、min (number of shards, maxTasks) should also be implemented. This is also easier to understand. If there are only 2 shards and maxTasks=5, the number of tasks can only be 2, which is why the number of tasks is maxTasks instead of taskNum. For example, the data source has only 2 topics, or 2 tables, maxTasks=5, the dimension of the shard is topic or table, and the final task data is 2, min(topicNum, maxTasks)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r859415860


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -117,10 +119,10 @@ public void start() {
         }
 
         buildRoute();
-        startListner();
+        startListener();

Review Comment:
   I made a mistake. The source connecotr should be needed to sense the change of the source mq topic. It is not necessary on the sink side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r867377768


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java:
##########
@@ -28,10 +28,12 @@
 
 public class DivideTaskByTopic extends TaskDivideStrategy {
 
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc,
+        int maxTasks) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
-        int parallelism = tdc.getTaskParallelism();
+        int parallelism = Math.min(tdc.getTaskParallelism(), maxTasks);

Review Comment:
   > ```
   >  1、may be ConfigDefine.CONN_TASK_PARALLELISM  should  deprecate
   > ```
   > 
   > this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM, 1); 2、if devide by topic int parallelism = Math.min(topicNum, maxTasks);
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r859455150


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -117,10 +119,10 @@ public void start() {
         }
 
         buildRoute();
-        startListner();
+        startListener();

Review Comment:
   > I made a mistake. The source connecotr should be needed to sense the change of the source mq topic. It is not necessary on the sink side.
   
   this method is in the source connector, what do you mean by sink side? in addition, can you review the latest commits :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857080938


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -117,10 +119,10 @@ public void start() {
         }
 
         buildRoute();
-        startListner();
+        startListener();

Review Comment:
   > startListener may not be needed, because the runtime has already done the load balancing logic, and put the load balancing queue in the context of each task
   
   I don't think it should be deleted, because the startListener method maintains changes in topic information, and the runtime maintains changes in task configuration and load balancing. Changes to a topic should not be maintained by the runtime :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r867674521


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java:
##########
@@ -32,10 +32,11 @@
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public class DivideTaskByConsistentHash extends TaskDivideStrategy {
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc, int maxTasks) {
 
         List<KeyValue> config = new ArrayList<>();
-        int parallelism = tdc.getTaskParallelism();
+        int parallelism = Math.min(tdc.getTaskParallelism(), maxTasks);

Review Comment:
   > Is it possible to delete taskParallelism directly?
   
   if divide by hash or divide by queue or divide by group,directly use the maxTasks parameter to control the number of tasks?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r867317381


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java:
##########
@@ -28,10 +28,12 @@
 
 public class DivideTaskByTopic extends TaskDivideStrategy {
 
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc,
+        int maxTasks) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
-        int parallelism = tdc.getTaskParallelism();
+        int parallelism = Math.min(tdc.getTaskParallelism(), maxTasks);

Review Comment:
        1、may be ConfigDefine.CONN_TASK_PARALLELISM  should  deprecate
      this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM, 1);
   2、if devide by topic
   int parallelism = Math.min(topicNum, maxTasks);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r867377167


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java:
##########
@@ -28,10 +28,12 @@
 
 public class DivideTaskByTopic extends TaskDivideStrategy {
 
-    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+    @Override
+    public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc,
+        int maxTasks) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
-        int parallelism = tdc.getTaskParallelism();
+        int parallelism = Math.min(tdc.getTaskParallelism(), maxTasks);

Review Comment:
   > ```
   >  1、may be ConfigDefine.CONN_TASK_PARALLELISM  should  deprecate
   > ```
   > 
   > this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM, 1); 2、if devide by topic int parallelism = Math.min(topicNum, maxTasks);
   
   as you mean 
   if divide by queue, int parallelism = Math.min(queueNum, maxTasks) 
   if divide by group, int parallelism = Math.min(groupNum, maxTasks) 
   Is that right?
   then the number of hash nodes is still compared with taskParallelism and maxTasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r860549279


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   > The limitation of the previous version of the connect api, if the replicator wants to provide the ability of multiple tasks, only the replicator itself can provide the number of tasks, but we found that the ability of multiple tasks is general, and the replicator should not implement its own definition configuration, almost every specific connector implementation There is such a demand, so the specified configuration of the number of tasks should be unified. It is easier to use the maxTasks provided by the api to unify this capability. The connector shards the tasks according to the specific storage sharding. The actual number of task shards should be, For example, by topic, min (the number of tasks divided by topic, maxTasks) is configured.
   
   Then I think the taskParallelism parameter should be removed and use maxTask instead of taskParallelism to control the number of tasks. Otherwise, writing like min (the number of tasks divided by topic, maxTasks) will only increase the cost of understanding, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r861488501


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   > 
   
   I understand what you mean, but the current replicator uses taskParallelism to do sharding. According to what you said, if 2 topics and taskParallelism is 1, then both topics are on 1 task (not 2 tasks). In this way, taskParallelism may be retained, then the logic is taskParallelism = min(taskParallelism, maxTasks)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Slideee commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

Posted by GitBox <gi...@apache.org>.
Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857081213


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -87,20 +88,25 @@ public RmqMetaReplicator() {
         executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
     }
 
-    @Override public String verifyAndSetConfig(KeyValue config) {
+    @Override public void validate(KeyValue config) {
         log.info("verifyAndSetConfig...");
         try {
             replicatorConfig.validate(config);
+            this.configValid = true;
         } catch (IllegalArgumentException e) {
-            return e.getMessage();
+            return;
         }
         this.prepare();
-        this.configValid = true;
-        return "";
+        return;
+    }
+
+    @Override public void init(KeyValue config) {
+

Review Comment:
   > config should be set by init
   
   For the Replicator, the validate method is already config initialized. So it's not initialized in the init method.



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -89,26 +90,27 @@ private synchronized void startMQAdminTools() throws MQClientException {
     }
 
     @Override
-    public String verifyAndSetConfig(KeyValue config) {
+    public void validate(KeyValue config) {
 
         // Check the need key.
         for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                return;

Review Comment:
   > Missing configuration should throw an exception instead of returning directly. Returning directly indicates that the verification has passed
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org