You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:28 UTC

[rocketmq-flink] 15/33: create client instance for each sink/source

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit fc793137cc1808b422c2285e5a5353c685e57bac
Author: zhu zhengwen <zh...@xiaohongshu.com>
AuthorDate: Fri Jun 21 15:54:16 2019 +0800

    create client instance for each sink/source
---
 src/main/java/org/apache/rocketmq/flink/RocketMQSink.java   | 3 ++-
 src/main/java/org/apache/rocketmq/flink/RocketMQSource.java | 7 ++-----
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index 41bbcbe..ca6848d 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.UUID;
 
 import org.apache.commons.lang.Validate;
 import org.apache.flink.configuration.Configuration;
@@ -87,7 +88,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
         Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null");
 
         producer = new DefaultMQProducer();
-        producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
+        producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
         RocketMQConfig.buildProducerConfigs(props, producer);
 
         batchList = new LinkedList<>();
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 14c479b..dcb1d31 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -19,10 +19,7 @@
 package org.apache.rocketmq.flink;
 
 import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang.Validate;
@@ -115,7 +112,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
         pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
         consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
 
-        consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
+        consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
         RocketMQConfig.buildConsumerConfigs(props, consumer);
     }