You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/03 08:08:14 UTC
[incubator-inlong] branch master updated: [INLONG-2802][DataProxy] Fix mx.properties local file updating too often (#2811)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0a91cac [INLONG-2802][DataProxy] Fix mx.properties local file updating too often (#2811)
0a91cac is described below
commit 0a91cac44849a50508e73354fc06c7fb034422d7
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Thu Mar 3 16:08:10 2022 +0800
[INLONG-2802][DataProxy] Fix mx.properties local file updating too often (#2811)
---
inlong-dataproxy/bin/dataproxy-start.sh | 2 +-
.../inlong/dataproxy/config/ConfigManager.java | 38 ++++++++++++++++------
2 files changed, 29 insertions(+), 11 deletions(-)
diff --git a/inlong-dataproxy/bin/dataproxy-start.sh b/inlong-dataproxy/bin/dataproxy-start.sh
index f2b1188..3dfc7a8 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -20,7 +20,7 @@
#
cd "$(dirname "$0")"/../conf || exit
-for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties}
+for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties,thrid_party_cluster.properties}
do
if [ ! -f "$i" ]; then
touch "$i"
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 0cf74d7..02a3794 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.config;
import com.google.gson.Gson;
+import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
@@ -115,16 +116,22 @@ public class ConfigManager {
* @param addElseRemove - if add(true) else remove(false)
* @return true if changed else false.
*/
- private boolean updatePropertiesHolder(Map<String, String> result,
- PropertiesConfigHolder holder, boolean addElseRemove) {
+ private boolean updatePropertiesHolder(Map<String, String> result, PropertiesConfigHolder holder,
+ boolean addElseRemove) {
Map<String, String> tmpHolder = holder.forkHolder();
boolean changed = false;
+
for (Map.Entry<String, String> entry : result.entrySet()) {
- String oldValue = addElseRemove
- ? tmpHolder.put(entry.getKey(), entry.getValue()) : tmpHolder.remove(entry.getKey());
- // if addElseRemove is false, that means removing item, changed is true.
- if (oldValue == null || !oldValue.equals(entry.getValue()) || !addElseRemove) {
- changed = true;
+ if (addElseRemove) {
+ String oldValue = tmpHolder.put(entry.getKey(), entry.getValue());
+ if (!ObjectUtils.equals(oldValue, entry.getValue())) {
+ changed = true;
+ }
+ } else {
+ String oldValue = tmpHolder.remove(entry.getKey());
+ if (oldValue != null) {
+ changed = true;
+ }
}
}
@@ -271,6 +278,7 @@ public class ConfigManager {
try {
if (StringUtils.isEmpty(proxyClusterName)) {
LOG.error("proxyClusterName is null");
+ return false;
}
String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getConfig_v2?clusterName="
+ proxyClusterName;
@@ -297,6 +305,12 @@ public class ConfigManager {
*/
int index = 1;
List<ThirdPartyClusterInfo> clusterSet = configJson.getData().getMqSet();
+
+ if (clusterSet == null || clusterSet.isEmpty()) {
+ LOG.error("getConfig from manager: no available mq config");
+ return false;
+ }
+
for (ThirdPartyClusterInfo mqCluster : clusterSet) {
String key = ThirdPartyClusterConfigHolder.URL_STORE_PREFIX + index;
String value = mqCluster.getUrl() + AttributeConstants.KEY_VALUE_SEPARATOR
@@ -308,15 +322,19 @@ public class ConfigManager {
mqConfig.putAll(clusterSet.get(0).getParams());
for (DataProxyConfig topic : configJson.getData().getTopicList()) {
- groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
- groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic());
+ if (!StringUtils.isEmpty(topic.getM())) {
+ groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
+ }
+ if (!StringUtils.isEmpty(topic.getTopic())) {
+ groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic());
+ }
}
configManager.addMxProperties(groupIdToMValue);
configManager.addTopicProperties(groupIdToTopic);
configManager.updateThirdPartyClusterProperties(mqConfig);
// store mq common configs and url2token
- configManager.getThirdPartyClusterConfig().putAll(clusterSet.get(0).getParams());
+ configManager.getThirdPartyClusterConfig().putAll(mqConfig);
configManager.getThirdPartyClusterHolder()
.setUrl2token(configManager.getThirdPartyClusterHolder().getUrl2token());
} else {