You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/24 06:21:27 UTC

[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #117: [ISSUE #116] replicator adapt to the new connect api

odbozhou commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857070808


##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java:
##########
@@ -117,38 +128,19 @@ public void start(KeyValue config) {
             }
 
             for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable : stats.getOffsetTable().entrySet()) {
-
                 MessageQueue mq = offsetTable.getKey();
                 long srcOffset = offsetTable.getValue().getConsumerOffset();
                 long targetOffset = this.store.convertTargetOffset(mq, group, srcOffset);
 
+                List<Field> fields = new ArrayList<Field>();
+                Schema schema = new Schema(SchemaEnum.OFFSET.name(), FieldType.STRING, fields);
+                schema.getFields().add(new Field(0, FieldName.OFFSET.getKey(), SchemaBuilder.string().build()));

Review Comment:
   offset use INT64  may be better



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -89,26 +90,27 @@ private synchronized void startMQAdminTools() throws MQClientException {
     }
 
     @Override
-    public String verifyAndSetConfig(KeyValue config) {
+    public void validate(KeyValue config) {
 
         // Check the need key.
         for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                return;

Review Comment:
   Missing configuration should throw an exception instead of returning directly. Returning directly indicates that the verification has passed



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -87,20 +88,25 @@ public RmqMetaReplicator() {
         executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
     }
 
-    @Override public String verifyAndSetConfig(KeyValue config) {
+    @Override public void validate(KeyValue config) {
         log.info("verifyAndSetConfig...");
         try {
             replicatorConfig.validate(config);
+            this.configValid = true;
         } catch (IllegalArgumentException e) {
-            return e.getMessage();
+            return;
         }
         this.prepare();
-        this.configValid = true;
-        return "";
+        return;
+    }
+
+    @Override public void init(KeyValue config) {
+

Review Comment:
   config should be set by init 



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -87,20 +88,25 @@ public RmqMetaReplicator() {
         executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
     }
 
-    @Override public String verifyAndSetConfig(KeyValue config) {
+    @Override public void validate(KeyValue config) {
         log.info("verifyAndSetConfig...");
         try {
             replicatorConfig.validate(config);
+            this.configValid = true;
         } catch (IllegalArgumentException e) {

Review Comment:
   I think the exception can be thrown upward, because there is no return, the upper layer cannot determine whether the verification is passed, otherwise the exception will be replenished, and there is no exception log, it is difficult to diagnose the problem



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -164,6 +166,11 @@ public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<T
         return true;
     }
 
+
+    @Override public void init(KeyValue config) {
+

Review Comment:
   config should be init by init



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         if (!configValid) {

Review Comment:
   maxTasks can specify the number of tasks. Can you consider whether it needs to be divided into multiple tasks?



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -89,26 +90,27 @@ private synchronized void startMQAdminTools() throws MQClientException {
     }
 
     @Override
-    public String verifyAndSetConfig(KeyValue config) {
+    public void validate(KeyValue config) {
 
         // Check the need key.
         for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                return;
             }
         }
 
         try {
             this.replicatorConfig.validate(config);
         } catch (IllegalArgumentException e) {
-            return e.getMessage();
+            return;

Review Comment:
   Exception should be throw



##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -117,10 +119,10 @@ public void start() {
         }
 
         buildRoute();
-        startListner();
+        startListener();

Review Comment:
   startListener may not be needed, because the runtime has already done the load balancing logic, and put the load balancing queue in the context of each task



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org