You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:28 UTC
[rocketmq-flink] 15/33: create client instance for each sink/source
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit fc793137cc1808b422c2285e5a5353c685e57bac
Author: zhu zhengwen <zh...@xiaohongshu.com>
AuthorDate: Fri Jun 21 15:54:16 2019 +0800
create client instance for each sink/source
---
src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 3 ++-
src/main/java/org/apache/rocketmq/flink/RocketMQSource.java | 7 ++-----
2 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index 41bbcbe..ca6848d 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import java.util.UUID;
import org.apache.commons.lang.Validate;
import org.apache.flink.configuration.Configuration;
@@ -87,7 +88,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null");
producer = new DefaultMQProducer();
- producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
+ producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
RocketMQConfig.buildProducerConfigs(props, producer);
batchList = new LinkedList<>();
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 14c479b..dcb1d31 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -19,10 +19,7 @@
package org.apache.rocketmq.flink;
import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.Validate;
@@ -115,7 +112,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
- consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
+ consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
RocketMQConfig.buildConsumerConfigs(props, consumer);
}