You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/29 07:32:31 UTC

[inlong] branch release-1.3.0 updated: [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new a78b25523 [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041)
a78b25523 is described below

commit a78b255236f1f50aed89ed92a8e0956e0ecf9e21
Author: Goson Zhang <46...@qq.com>
AuthorDate: Wed Sep 28 10:02:23 2022 +0800

    [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041)
---
 .../inlong/dataproxy/config/ConfigManager.java     | 100 +++++++++++++++------
 1 file changed, 72 insertions(+), 28 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index f66b46209..554881448 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -101,6 +102,72 @@ public class ConfigManager {
         return topicConfig.getHolder();
     }
 
+    public boolean addTopicProperties(Map<String, String> result) {
+        return updatePropertiesHolder(result, topicConfig, true);
+    }
+
+    public boolean deleteTopicProperties(Map<String, String> result) {
+        return updatePropertiesHolder(result, topicConfig, false);
+    }
+
+    public Map<String, String> getMxProperties() {
+        return mxConfig.getHolder();
+    }
+
+    public boolean addMxProperties(Map<String, String> result) {
+        return updatePropertiesHolder(result, mxConfig, true);
+    }
+
+    public boolean deleteMxProperties(Map<String, String> result) {
+        return updatePropertiesHolder(result, mxConfig, false);
+    }
+
+    public boolean updateTopicProperties(Map<String, String> result) {
+        return updatePropertiesHolder(result, topicConfig);
+    }
+
+    public boolean updateMQClusterProperties(Map<String, String> result) {
+        return updatePropertiesHolder(result, mqClusterConfigHolder);
+    }
+
+    public boolean updateMxProperties(Map<String, String> result) {
+        return updatePropertiesHolder(result, mxConfig);
+    }
+
+    /**
+     * update old maps, reload local files if changed.
+     *
+     * @param result - map pending to be added
+     * @param holder - property holder
+     * @return true if changed else false.
+     */
+    private boolean updatePropertiesHolder(Map<String, String> result,
+                                           PropertiesConfigHolder holder) {
+        boolean changed = false;
+        Map<String, String> tmpHolder = holder.forkHolder();
+        // Delete non-existent configuration records
+        Iterator<Map.Entry<String, String>> it = tmpHolder.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            if (!result.containsKey(entry.getKey())) {
+                it.remove();
+                changed = true;
+            }
+        }
+        // add new configure records
+        for (Map.Entry<String, String> entry : result.entrySet()) {
+            String oldValue = tmpHolder.put(entry.getKey(), entry.getValue());
+            if (!ObjectUtils.equals(oldValue, entry.getValue())) {
+                changed = true;
+            }
+        }
+        if (changed) {
+            return holder.loadFromHolderToFile(tmpHolder);
+        } else {
+            return false;
+        }
+    }
+
     /**
      * update old maps, reload local files if changed.
      *
@@ -109,8 +176,9 @@ public class ConfigManager {
      * @param addElseRemove - if add(true) else remove(false)
      * @return true if changed else false.
      */
-    private boolean updatePropertiesHolder(Map<String, String> result, PropertiesConfigHolder holder,
-            boolean addElseRemove) {
+    private boolean updatePropertiesHolder(Map<String, String> result,
+                                           PropertiesConfigHolder holder,
+                                           boolean addElseRemove) {
         Map<String, String> tmpHolder = holder.forkHolder();
         boolean changed = false;
 
@@ -135,30 +203,6 @@ public class ConfigManager {
         }
     }
 
-    public boolean addTopicProperties(Map<String, String> result) {
-        return updatePropertiesHolder(result, topicConfig, true);
-    }
-
-    public boolean deleteTopicProperties(Map<String, String> result) {
-        return updatePropertiesHolder(result, topicConfig, false);
-    }
-
-    public boolean updateMQClusterProperties(Map<String, String> result) {
-        return updatePropertiesHolder(result, mqClusterConfigHolder, true);
-    }
-
-    public Map<String, String> getMxProperties() {
-        return mxConfig.getHolder();
-    }
-
-    public boolean addMxProperties(Map<String, String> result) {
-        return updatePropertiesHolder(result, mxConfig, true);
-    }
-
-    public boolean deleteMxProperties(Map<String, String> result) {
-        return updatePropertiesHolder(result, mxConfig, false);
-    }
-
     public Map<String, String> getDcMappingProperties() {
         return dcConfig.getHolder();
     }
@@ -328,8 +372,8 @@ public class ConfigManager {
                             groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic());
                         }
                     }
-                    configManager.addMxProperties(groupIdToMValue);
-                    configManager.addTopicProperties(groupIdToTopic);
+                    configManager.updateMxProperties(groupIdToMValue);
+                    configManager.updateTopicProperties(groupIdToTopic);
                     // other params for mq
                     mqConfig.putAll(clusterSet.get(0).getParams());
                     configManager.updateMQClusterProperties(mqConfig);