You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/30 02:41:01 UTC
[inlong] branch release-1.3.0 updated: [INLONG-6065][DataProxy] Delete the Pulsar client synchronously when deleting a topic (#6066)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 9e7943edb [INLONG-6065][DataProxy] Delete the Pulsar client synchronously when deleting a topic (#6066)
9e7943edb is described below
commit 9e7943edb35eda6d05471c054c3ac888cc56a95d
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Sep 30 10:27:54 2022 +0800
[INLONG-6065][DataProxy] Delete the Pulsar client synchronously when deleting a topic (#6066)
Co-authored-by: healchow <he...@gmail.com>
---
.../apache/inlong/dataproxy/sink/PulsarSink.java | 25 +++++++++++++++++-----
.../dataproxy/sink/pulsar/PulsarClientService.java | 14 ++++++++++++
2 files changed, 34 insertions(+), 5 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index cac638ce5..c2014965d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -246,21 +246,36 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
/**
* When topic.properties is re-enabled, the producer update is triggered
*/
- public void diffSetPublish(PulsarClientService pulsarClientService, Set<String> originalSet, Set<String> endSet) {
+ public void diffSetPublish(PulsarClientService pulsarClientService,
+ Set<String> curTopicSet, Set<String> newTopicSet) {
boolean changed = false;
- for (String s : endSet) {
- if (!originalSet.contains(s)) {
+ // create producers for new topics
+ for (String newTopic : newTopicSet) {
+ if (!curTopicSet.contains(newTopic)) {
changed = true;
try {
- pulsarClientService.initTopicProducer(s);
+ pulsarClientService.initTopicProducer(newTopic);
} catch (Exception e) {
logger.error("get producer failed: ", e);
}
}
}
+ // remove producers for deleted topics
+ for (String oldTopic : curTopicSet) {
+ if (!newTopicSet.contains(oldTopic)) {
+ changed = true;
+ try {
+ pulsarClientService.destroyProducerByTopic(oldTopic);
+ } catch (Exception e) {
+ logger.error("remove producer failed: ", e);
+ }
+ }
+ }
if (changed) {
- logger.info("topics.properties has changed, trigger diff publish for {}", getName());
topicProperties = configManager.getTopicProperties();
+ logger.info("topics.properties has changed, trigger diff publish for {},"
+ + " old topic set = {}, new topic set = {}, current topicProperties = {}",
+ getName(), curTopicSet, newTopicSet, topicProperties);
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 66b06e93d..dc0a87b07 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -323,6 +323,20 @@ public class PulsarClientService {
return initTopicProducer(topic, null, null);
}
+ public boolean destroyProducerByTopic(String topic) {
+ List<TopicProducerInfo> producerInfoList = producerInfoMap.remove(topic);
+ if (producerInfoList == null || producerInfoList.isEmpty()) {
+ return true;
+ }
+ for (TopicProducerInfo producerInfo : producerInfoList) {
+ if (producerInfo != null) {
+ producerInfo.close();
+ logger.info("destroy producer for topic={}", topic);
+ }
+ }
+ return true;
+ }
+
private TopicProducerInfo getProducerInfo(int poolIndex, String topic, String inlongGroupId,
String inlongStreamId) {
List<TopicProducerInfo> producerList = initTopicProducer(topic, inlongGroupId, inlongStreamId);