You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/03 08:08:14 UTC

[incubator-inlong] branch master updated: [INLONG-2802][DataProxy] Fix mx.properties local file updating too often (#2811)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a91cac  [INLONG-2802][DataProxy] Fix mx.properties local file updating too often (#2811)
0a91cac is described below

commit 0a91cac44849a50508e73354fc06c7fb034422d7
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Thu Mar 3 16:08:10 2022 +0800

    [INLONG-2802][DataProxy] Fix mx.properties local file updating too often (#2811)
---
 inlong-dataproxy/bin/dataproxy-start.sh            |  2 +-
 .../inlong/dataproxy/config/ConfigManager.java     | 38 ++++++++++++++++------
 2 files changed, 29 insertions(+), 11 deletions(-)

diff --git a/inlong-dataproxy/bin/dataproxy-start.sh b/inlong-dataproxy/bin/dataproxy-start.sh
index f2b1188..3dfc7a8 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -20,7 +20,7 @@
 #
 cd "$(dirname "$0")"/../conf || exit
 
-for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties}
+for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties,thrid_party_cluster.properties}
 do
   if [ ! -f "$i" ]; then
     touch "$i"
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 0cf74d7..02a3794 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.config;
 
 import com.google.gson.Gson;
+import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.config.RequestConfig;
@@ -115,16 +116,22 @@ public class ConfigManager {
      * @param addElseRemove - if add(true) else remove(false)
      * @return true if changed else false.
      */
-    private boolean updatePropertiesHolder(Map<String, String> result,
-            PropertiesConfigHolder holder, boolean addElseRemove) {
+    private boolean updatePropertiesHolder(Map<String, String> result, PropertiesConfigHolder holder,
+            boolean addElseRemove) {
         Map<String, String> tmpHolder = holder.forkHolder();
         boolean changed = false;
+
         for (Map.Entry<String, String> entry : result.entrySet()) {
-            String oldValue = addElseRemove
-                    ? tmpHolder.put(entry.getKey(), entry.getValue()) : tmpHolder.remove(entry.getKey());
-            // if addElseRemove is false, that means removing item, changed is true.
-            if (oldValue == null || !oldValue.equals(entry.getValue()) || !addElseRemove) {
-                changed = true;
+            if (addElseRemove) {
+                String oldValue = tmpHolder.put(entry.getKey(), entry.getValue());
+                if (!ObjectUtils.equals(oldValue, entry.getValue())) {
+                    changed = true;
+                }
+            } else {
+                String oldValue = tmpHolder.remove(entry.getKey());
+                if (oldValue != null) {
+                    changed = true;
+                }
             }
         }
 
@@ -271,6 +278,7 @@ public class ConfigManager {
             try {
                 if (StringUtils.isEmpty(proxyClusterName)) {
                     LOG.error("proxyClusterName is null");
+                    return false;
                 }
                 String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getConfig_v2?clusterName="
                         + proxyClusterName;
@@ -297,6 +305,12 @@ public class ConfigManager {
                      */
                     int index = 1;
                     List<ThirdPartyClusterInfo> clusterSet = configJson.getData().getMqSet();
+
+                    if (clusterSet == null || clusterSet.isEmpty()) {
+                        LOG.error("getConfig from manager: no available mq config");
+                        return false;
+                    }
+
                     for (ThirdPartyClusterInfo mqCluster : clusterSet) {
                         String key = ThirdPartyClusterConfigHolder.URL_STORE_PREFIX + index;
                         String value = mqCluster.getUrl() + AttributeConstants.KEY_VALUE_SEPARATOR
@@ -308,15 +322,19 @@ public class ConfigManager {
                     mqConfig.putAll(clusterSet.get(0).getParams());
 
                     for (DataProxyConfig topic : configJson.getData().getTopicList()) {
-                        groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
-                        groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic());
+                        if (!StringUtils.isEmpty(topic.getM())) {
+                            groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
+                        }
+                        if (!StringUtils.isEmpty(topic.getTopic())) {
+                            groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic());
+                        }
                     }
                     configManager.addMxProperties(groupIdToMValue);
                     configManager.addTopicProperties(groupIdToTopic);
                     configManager.updateThirdPartyClusterProperties(mqConfig);
 
                     // store mq common configs and url2token
-                    configManager.getThirdPartyClusterConfig().putAll(clusterSet.get(0).getParams());
+                    configManager.getThirdPartyClusterConfig().putAll(mqConfig);
                     configManager.getThirdPartyClusterHolder()
                             .setUrl2token(configManager.getThirdPartyClusterHolder().getUrl2token());
                 } else {