You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:14 UTC

[rocketmq-connect] 04/39: [ISSUE #351] rmq client setInstanceName on sourceTask (#355)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit c7a2c41c5abfafe41d89b912452791787e11539a
Author: SanChen <sa...@chenyiliu.com>
AuthorDate: Wed Aug 7 11:00:16 2019 +0800

    [ISSUE #351] rmq client setInstanceName on sourceTask (#355)
---
 .../apache/rocketmq/connector/RmqSourceConnector.java    |  3 ++-
 .../org/apache/rocketmq/connector/RmqSourceTask.java     |  1 +
 .../java/org/apache/rocketmq/connector/common/Utils.java | 16 ++++++++++++++++
 3 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
index 2b5bda4..33eb958 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
@@ -111,7 +111,8 @@ public class RmqSourceConnector extends SourceConnector {
             RPCHook rpcHook = null;
             this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
             this.defaultMQAdminExt.setNamesrvAddr(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ));
-            this.defaultMQAdminExt.setInstanceName(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+            this.defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+            this.defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ)));
             try {
                 defaultMQAdminExt.start();
             } catch (MQClientException e) {
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
index 4fca698..7b3b011 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
@@ -71,6 +71,7 @@ public class RmqSourceTask extends SourceTask {
         ConfigUtil.load(config, this.config);
         this.consumer.setConsumerGroup(this.taskId);
         this.consumer.setNamesrvAddr(this.config.getSourceRocketmq());
+        this.consumer.setInstanceName(Utils.createInstanceName(this.config.getSourceRocketmq()));
         List<TaskTopicInfo> topicList = JSONObject.parseArray(this.config.getTaskTopicList(), TaskTopicInfo.class);
 
         try {
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
index f5822c6..30fe44c 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
@@ -16,6 +16,10 @@
  */
 package org.apache.rocketmq.connector.common;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 public class Utils {
 
     public static String createGroupName(String prefix) {
@@ -29,4 +33,16 @@ public class Utils {
     public static String createTaskId(String prefix) {
         return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
     }
+
+    public static String createInstanceName(String namesrvAddr) {
+        String[] namesrvArray = namesrvAddr.split(";");
+        List<String> namesrvList = new ArrayList<String>();
+        for (String ns: namesrvArray) {
+            if (!namesrvList.contains(ns)) {
+                namesrvList.add(ns);
+            }
+        }
+        Collections.sort(namesrvList);
+        return String.valueOf(namesrvList.toString().hashCode());
+    }
 }