You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:14 UTC
[rocketmq-connect] 04/39: [ISSUE #351] rmq client setInstanceName on sourceTask (#355)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit c7a2c41c5abfafe41d89b912452791787e11539a
Author: SanChen <sa...@chenyiliu.com>
AuthorDate: Wed Aug 7 11:00:16 2019 +0800
[ISSUE #351] rmq client setInstanceName on sourceTask (#355)
---
.../apache/rocketmq/connector/RmqSourceConnector.java | 3 ++-
.../org/apache/rocketmq/connector/RmqSourceTask.java | 1 +
.../java/org/apache/rocketmq/connector/common/Utils.java | 16 ++++++++++++++++
3 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
index 2b5bda4..33eb958 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
@@ -111,7 +111,8 @@ public class RmqSourceConnector extends SourceConnector {
RPCHook rpcHook = null;
this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
this.defaultMQAdminExt.setNamesrvAddr(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ));
- this.defaultMQAdminExt.setInstanceName(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ this.defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ this.defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ)));
try {
defaultMQAdminExt.start();
} catch (MQClientException e) {
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
index 4fca698..7b3b011 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
@@ -71,6 +71,7 @@ public class RmqSourceTask extends SourceTask {
ConfigUtil.load(config, this.config);
this.consumer.setConsumerGroup(this.taskId);
this.consumer.setNamesrvAddr(this.config.getSourceRocketmq());
+ this.consumer.setInstanceName(Utils.createInstanceName(this.config.getSourceRocketmq()));
List<TaskTopicInfo> topicList = JSONObject.parseArray(this.config.getTaskTopicList(), TaskTopicInfo.class);
try {
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
index f5822c6..30fe44c 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
@@ -16,6 +16,10 @@
*/
package org.apache.rocketmq.connector.common;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
public class Utils {
public static String createGroupName(String prefix) {
@@ -29,4 +33,16 @@ public class Utils {
public static String createTaskId(String prefix) {
return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
}
+
+ public static String createInstanceName(String namesrvAddr) {
+ String[] namesrvArray = namesrvAddr.split(";");
+ List<String> namesrvList = new ArrayList<String>();
+ for (String ns: namesrvArray) {
+ if (!namesrvList.contains(ns)) {
+ namesrvList.add(ns);
+ }
+ }
+ Collections.sort(namesrvList);
+ return String.valueOf(namesrvList.toString().hashCode());
+ }
}