You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/28 02:02:29 UTC
[inlong] branch master updated: [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2a8f81948 [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041)
2a8f81948 is described below
commit 2a8f819485cb99de707f61922e43dd2b1e9f5070
Author: Goson Zhang <46...@qq.com>
AuthorDate: Wed Sep 28 10:02:23 2022 +0800
[INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041)
---
.../inlong/dataproxy/config/ConfigManager.java | 100 +++++++++++++++------
1 file changed, 72 insertions(+), 28 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index f66b46209..554881448 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -101,6 +102,72 @@ public class ConfigManager {
return topicConfig.getHolder();
}
+ public boolean addTopicProperties(Map<String, String> result) {
+ return updatePropertiesHolder(result, topicConfig, true);
+ }
+
+ public boolean deleteTopicProperties(Map<String, String> result) {
+ return updatePropertiesHolder(result, topicConfig, false);
+ }
+
+ public Map<String, String> getMxProperties() {
+ return mxConfig.getHolder();
+ }
+
+ public boolean addMxProperties(Map<String, String> result) {
+ return updatePropertiesHolder(result, mxConfig, true);
+ }
+
+ public boolean deleteMxProperties(Map<String, String> result) {
+ return updatePropertiesHolder(result, mxConfig, false);
+ }
+
+ public boolean updateTopicProperties(Map<String, String> result) {
+ return updatePropertiesHolder(result, topicConfig);
+ }
+
+ public boolean updateMQClusterProperties(Map<String, String> result) {
+ return updatePropertiesHolder(result, mqClusterConfigHolder);
+ }
+
+ public boolean updateMxProperties(Map<String, String> result) {
+ return updatePropertiesHolder(result, mxConfig);
+ }
+
+ /**
+ * update old maps, reload local files if changed.
+ *
+ * @param result - map pending to be added
+ * @param holder - property holder
+ * @return true if changed else false.
+ */
+ private boolean updatePropertiesHolder(Map<String, String> result,
+ PropertiesConfigHolder holder) {
+ boolean changed = false;
+ Map<String, String> tmpHolder = holder.forkHolder();
+ // Delete non-existent configuration records
+ Iterator<Map.Entry<String, String>> it = tmpHolder.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, String> entry = it.next();
+ if (!result.containsKey(entry.getKey())) {
+ it.remove();
+ changed = true;
+ }
+ }
+ // add new configure records
+ for (Map.Entry<String, String> entry : result.entrySet()) {
+ String oldValue = tmpHolder.put(entry.getKey(), entry.getValue());
+ if (!ObjectUtils.equals(oldValue, entry.getValue())) {
+ changed = true;
+ }
+ }
+ if (changed) {
+ return holder.loadFromHolderToFile(tmpHolder);
+ } else {
+ return false;
+ }
+ }
+
/**
* update old maps, reload local files if changed.
*
@@ -109,8 +176,9 @@ public class ConfigManager {
* @param addElseRemove - if add(true) else remove(false)
* @return true if changed else false.
*/
- private boolean updatePropertiesHolder(Map<String, String> result, PropertiesConfigHolder holder,
- boolean addElseRemove) {
+ private boolean updatePropertiesHolder(Map<String, String> result,
+ PropertiesConfigHolder holder,
+ boolean addElseRemove) {
Map<String, String> tmpHolder = holder.forkHolder();
boolean changed = false;
@@ -135,30 +203,6 @@ public class ConfigManager {
}
}
- public boolean addTopicProperties(Map<String, String> result) {
- return updatePropertiesHolder(result, topicConfig, true);
- }
-
- public boolean deleteTopicProperties(Map<String, String> result) {
- return updatePropertiesHolder(result, topicConfig, false);
- }
-
- public boolean updateMQClusterProperties(Map<String, String> result) {
- return updatePropertiesHolder(result, mqClusterConfigHolder, true);
- }
-
- public Map<String, String> getMxProperties() {
- return mxConfig.getHolder();
- }
-
- public boolean addMxProperties(Map<String, String> result) {
- return updatePropertiesHolder(result, mxConfig, true);
- }
-
- public boolean deleteMxProperties(Map<String, String> result) {
- return updatePropertiesHolder(result, mxConfig, false);
- }
-
public Map<String, String> getDcMappingProperties() {
return dcConfig.getHolder();
}
@@ -328,8 +372,8 @@ public class ConfigManager {
groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic());
}
}
- configManager.addMxProperties(groupIdToMValue);
- configManager.addTopicProperties(groupIdToTopic);
+ configManager.updateMxProperties(groupIdToMValue);
+ configManager.updateTopicProperties(groupIdToTopic);
// other params for mq
mqConfig.putAll(clusterSet.get(0).getParams());
configManager.updateMQClusterProperties(mqConfig);