You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/30 02:41:01 UTC

[inlong] branch release-1.3.0 updated: [INLONG-6065][DataProxy] Delete the Pulsar client synchronously when deleting a topic (#6066)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 9e7943edb [INLONG-6065][DataProxy] Delete the Pulsar client synchronously when deleting a topic (#6066)
9e7943edb is described below

commit 9e7943edb35eda6d05471c054c3ac888cc56a95d
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Sep 30 10:27:54 2022 +0800

    [INLONG-6065][DataProxy] Delete the Pulsar client synchronously when deleting a topic (#6066)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 25 +++++++++++++++++-----
 .../dataproxy/sink/pulsar/PulsarClientService.java | 14 ++++++++++++
 2 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index cac638ce5..c2014965d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -246,21 +246,36 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
     /**
      * When topic.properties is re-enabled, the producer update is triggered
      */
-    public void diffSetPublish(PulsarClientService pulsarClientService, Set<String> originalSet, Set<String> endSet) {
+    public void diffSetPublish(PulsarClientService pulsarClientService,
+                               Set<String> curTopicSet, Set<String> newTopicSet) {
         boolean changed = false;
-        for (String s : endSet) {
-            if (!originalSet.contains(s)) {
+        // create producers for new topics
+        for (String newTopic : newTopicSet) {
+            if (!curTopicSet.contains(newTopic)) {
                 changed = true;
                 try {
-                    pulsarClientService.initTopicProducer(s);
+                    pulsarClientService.initTopicProducer(newTopic);
                 } catch (Exception e) {
                     logger.error("get producer failed: ", e);
                 }
             }
         }
+        // remove producers for deleted topics
+        for (String oldTopic : curTopicSet) {
+            if (!newTopicSet.contains(oldTopic)) {
+                changed = true;
+                try {
+                    pulsarClientService.destroyProducerByTopic(oldTopic);
+                } catch (Exception e) {
+                    logger.error("remove producer failed: ", e);
+                }
+            }
+        }
         if (changed) {
-            logger.info("topics.properties has changed, trigger diff publish for {}", getName());
             topicProperties = configManager.getTopicProperties();
+            logger.info("topics.properties has changed, trigger diff publish for {},"
+                    + " old topic set = {}, new topic set = {}, current topicProperties = {}",
+                    getName(), curTopicSet, newTopicSet, topicProperties);
         }
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 66b06e93d..dc0a87b07 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -323,6 +323,20 @@ public class PulsarClientService {
         return initTopicProducer(topic, null, null);
     }
 
+    public boolean destroyProducerByTopic(String topic) {
+        List<TopicProducerInfo> producerInfoList = producerInfoMap.remove(topic);
+        if (producerInfoList == null || producerInfoList.isEmpty()) {
+            return true;
+        }
+        for (TopicProducerInfo producerInfo : producerInfoList) {
+            if (producerInfo != null) {
+                producerInfo.close();
+                logger.info("destroy producer for topic={}", topic);
+            }
+        }
+        return true;
+    }
+
     private TopicProducerInfo getProducerInfo(int poolIndex, String topic, String inlongGroupId,
             String inlongStreamId) {
         List<TopicProducerInfo> producerList = initTopicProducer(topic, inlongGroupId, inlongStreamId);