You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/29 07:34:14 UTC
[inlong] branch release-1.3.0 updated: [INLONG-6058][DataProxy] Add parameter checking and optimize client creation logic (#6059)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new f92f53513 [INLONG-6058][DataProxy] Add parameter checking and optimize client creation logic (#6059)
f92f53513 is described below
commit f92f535134268bd5730a37adea7cb038f38e4b78
Author: Goson Zhang <46...@qq.com>
AuthorDate: Thu Sep 29 15:09:33 2022 +0800
[INLONG-6058][DataProxy] Add parameter checking and optimize client creation logic (#6059)
---
.../inlong/dataproxy/config/ConfigManager.java | 6 ++++--
.../dataproxy/sink/pulsar/PulsarClientService.java | 25 ++++++++++++++++------
2 files changed, 23 insertions(+), 8 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 554881448..fa05427b9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_INTERVAL;
@@ -55,6 +56,7 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_IN
* Config manager class.
*/
public class ConfigManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class);
public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<>();
private static volatile boolean isInit = false;
@@ -157,7 +159,8 @@ public class ConfigManager {
// add new configure records
for (Map.Entry<String, String> entry : result.entrySet()) {
String oldValue = tmpHolder.put(entry.getKey(), entry.getValue());
- if (!ObjectUtils.equals(oldValue, entry.getValue())) {
+ if ((entry.getValue() == null && !Objects.equals("null", oldValue))
+ || (entry.getValue() != null && !Objects.equals(entry.getValue(), oldValue))) {
changed = true;
}
}
@@ -256,7 +259,6 @@ public class ConfigManager {
*/
public static class ReloadConfigWorker extends Thread {
- private static final Logger LOG = LoggerFactory.getLogger(ReloadConfigWorker.class);
private final ConfigManager configManager;
private final CloseableHttpClient httpClient;
private final Gson gson = new Gson();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index bf4558313..66b06e93d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.pulsar;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -363,10 +364,15 @@ public class PulsarClientService {
private void removeProducers(PulsarClient pulsarClient) {
for (List<TopicProducerInfo> producers : producerInfoMap.values()) {
- for (TopicProducerInfo topicProducer : producers) {
- if (topicProducer.getPulsarClient().equals(pulsarClient)) {
- topicProducer.close();
- producers.remove(topicProducer);
+ if (producers == null || producers.isEmpty()) {
+ continue;
+ }
+ Iterator<TopicProducerInfo> it = producers.iterator();
+ while (it.hasNext()) {
+ TopicProducerInfo entry = it.next();
+ if (entry.getPulsarClient().equals(pulsarClient)) {
+ entry.close();
+ it.remove();
}
}
}
@@ -412,10 +418,17 @@ public class PulsarClientService {
topic);
info.initProducer();
if (info.isCanUseToSendMessage()) {
- producerInfoMap.computeIfAbsent(topic, k -> new ArrayList<>()).add(info);
+ List<TopicProducerInfo> producerInfos = producerInfoMap.get(topic);
+ if (producerInfos == null) {
+ List<TopicProducerInfo> tmpProdInfos = new ArrayList<>();
+ producerInfos = producerInfoMap.putIfAbsent(topic, tmpProdInfos);
+ if (producerInfos == null) {
+ producerInfos = tmpProdInfos;
+ }
+ }
+ producerInfos.add(info);
}
}
-
} catch (PulsarClientException e) {
callBack.handleCreateClientException(url);
logger.error("create connection error in pulsar sink, "