You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/29 07:34:14 UTC

[inlong] branch release-1.3.0 updated: [INLONG-6058][DataProxy] Add parameter checking and optimize client creation logic (#6059)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new f92f53513 [INLONG-6058][DataProxy] Add parameter checking and optimize client creation logic (#6059)
f92f53513 is described below

commit f92f535134268bd5730a37adea7cb038f38e4b78
Author: Goson Zhang <46...@qq.com>
AuthorDate: Thu Sep 29 15:09:33 2022 +0800

    [INLONG-6058][DataProxy] Add parameter checking and optimize client creation logic (#6059)
---
 .../inlong/dataproxy/config/ConfigManager.java     |  6 ++++--
 .../dataproxy/sink/pulsar/PulsarClientService.java | 25 ++++++++++++++++------
 2 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 554881448..fa05427b9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_INTERVAL;
@@ -55,6 +56,7 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_IN
  * Config manager class.
  */
 public class ConfigManager {
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class);
 
     public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<>();
     private static volatile boolean isInit = false;
@@ -157,7 +159,8 @@ public class ConfigManager {
         // add new configure records
         for (Map.Entry<String, String> entry : result.entrySet()) {
             String oldValue = tmpHolder.put(entry.getKey(), entry.getValue());
-            if (!ObjectUtils.equals(oldValue, entry.getValue())) {
+            if ((entry.getValue() == null && !Objects.equals("null", oldValue))
+                    || (entry.getValue() != null && !Objects.equals(entry.getValue(), oldValue))) {
                 changed = true;
             }
         }
@@ -256,7 +259,6 @@ public class ConfigManager {
      */
     public static class ReloadConfigWorker extends Thread {
 
-        private static final Logger LOG = LoggerFactory.getLogger(ReloadConfigWorker.class);
         private final ConfigManager configManager;
         private final CloseableHttpClient httpClient;
         private final Gson gson = new Gson();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index bf4558313..66b06e93d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.pulsar;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -363,10 +364,15 @@ public class PulsarClientService {
 
     private void removeProducers(PulsarClient pulsarClient) {
         for (List<TopicProducerInfo> producers : producerInfoMap.values()) {
-            for (TopicProducerInfo topicProducer : producers) {
-                if (topicProducer.getPulsarClient().equals(pulsarClient)) {
-                    topicProducer.close();
-                    producers.remove(topicProducer);
+            if (producers == null || producers.isEmpty()) {
+                continue;
+            }
+            Iterator<TopicProducerInfo> it = producers.iterator();
+            while (it.hasNext()) {
+                TopicProducerInfo entry = it.next();
+                if (entry.getPulsarClient().equals(pulsarClient)) {
+                    entry.close();
+                    it.remove();
                 }
             }
         }
@@ -412,10 +418,17 @@ public class PulsarClientService {
                             topic);
                     info.initProducer();
                     if (info.isCanUseToSendMessage()) {
-                        producerInfoMap.computeIfAbsent(topic, k -> new ArrayList<>()).add(info);
+                        List<TopicProducerInfo> producerInfos = producerInfoMap.get(topic);
+                        if (producerInfos == null) {
+                            List<TopicProducerInfo> tmpProdInfos = new ArrayList<>();
+                            producerInfos = producerInfoMap.putIfAbsent(topic, tmpProdInfos);
+                            if (producerInfos == null) {
+                                producerInfos = tmpProdInfos;
+                            }
+                        }
+                        producerInfos.add(info);
                     }
                 }
-
             } catch (PulsarClientException e) {
                 callBack.handleCreateClientException(url);
                 logger.error("create connection error in pulsar sink, "