You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/06/01 08:06:12 UTC
[inlong] branch master updated: [INLONG-8135][DataProxy] Optimize ConfigManager implementation (Part two) (#8138)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c8c7b6a20 [INLONG-8135][DataProxy] Optimize ConfigManager implementation (Part two) (#8138)
c8c7b6a20 is described below
commit c8c7b6a20bea5e8555d7f3420cf15d2eb928fe7d
Author: Goson Zhang <46...@qq.com>
AuthorDate: Thu Jun 1 16:06:06 2023 +0800
[INLONG-8135][DataProxy] Optimize ConfigManager implementation (Part two) (#8138)
---
inlong-dataproxy/bin/dataproxy-start.sh | 2 +-
.../inlong/dataproxy/config/ConfigHolder.java | 4 +-
.../inlong/dataproxy/config/ConfigManager.java | 28 -----------
.../inlong/dataproxy/config/PropertiesHolder.java | 3 +-
.../config/holder/GroupIdNumConfigHolder.java | 1 +
.../config/holder/MxPropertiesHolder.java | 58 ----------------------
.../config/holder/PropertiesConfigHolder.java | 2 +-
.../config/remote/ConfigMessageServlet.java | 29 +++--------
.../dataproxy/http/SimpleMessageHandler.java | 8 +--
.../dataproxy/source/ServerMessageHandler.java | 8 ---
.../dataproxy/source/SimpleMessageHandler.java | 7 ---
.../src/test/resources/mx.properties | 16 ------
12 files changed, 14 insertions(+), 152 deletions(-)
diff --git a/inlong-dataproxy/bin/dataproxy-start.sh b/inlong-dataproxy/bin/dataproxy-start.sh
index 811915ae2..cd70b4a5f 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -32,7 +32,7 @@ error() {
fi
}
-for i in {mx.properties,weight.properties,common.properties,blacklist.properties,whitelist.properties,groupid_mapping.properties,topics.properties,mq_cluster.properties}
+for i in {weight.properties,common.properties,blacklist.properties,whitelist.properties,groupid_mapping.properties,topics.properties,mq_cluster.properties}
do
if [ ! -f "$i" ]; then
touch "$i"
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
index d6e533494..e07983550 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
@@ -130,7 +130,7 @@ public abstract class ConfigHolder {
}
}
- public AtomicBoolean getFileChanged() {
- return fileChanged;
+ public void setFileChanged() {
+ fileChanged.set(true);
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 6307d2bf8..fd0cc5377 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -24,7 +24,6 @@ import org.apache.inlong.dataproxy.config.holder.BlackListConfigHolder;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.config.holder.GroupIdNumConfigHolder;
import org.apache.inlong.dataproxy.config.holder.MQClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.MxPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.PropertiesConfigHolder;
import org.apache.inlong.dataproxy.config.holder.SourceReportConfigHolder;
import org.apache.inlong.dataproxy.config.holder.SourceReportInfo;
@@ -79,7 +78,6 @@ public class ConfigManager {
private final MQClusterConfigHolder mqClusterConfigHolder = new MQClusterConfigHolder("mq_cluster.properties");
private final PropertiesConfigHolder topicConfig = new PropertiesConfigHolder("topics.properties");
- private final MxPropertiesHolder mxConfig = new MxPropertiesHolder("mx.properties");
// source report configure holder
private final SourceReportConfigHolder sourceReportConfigHolder = new SourceReportConfigHolder();
// mq clusters ready
@@ -203,23 +201,6 @@ public class ConfigManager {
|| whitelistConfigHolder.isIllegalIP(strRemoteIP);
}
- // get mx configure info
- public Map<String, Map<String, String>> getMxPropertiesMaps() {
- return mxConfig.getMxPropertiesMaps();
- }
-
- public Map<String, String> getMxProperties() {
- return mxConfig.getHolder();
- }
-
- public boolean addMxProperties(Map<String, String> result) {
- return updatePropertiesHolder(result, mxConfig, true);
- }
-
- public boolean deleteMxProperties(Map<String, String> result) {
- return updatePropertiesHolder(result, mxConfig, false);
- }
-
public boolean updateTopicProperties(Map<String, String> result) {
return updatePropertiesHolder(result, topicConfig);
}
@@ -228,10 +209,6 @@ public class ConfigManager {
return updatePropertiesHolder(result, mqClusterConfigHolder);
}
- public boolean updateMxProperties(Map<String, String> result) {
- return updatePropertiesHolder(result, mxConfig);
- }
-
public void addSourceReportInfo(String sourceIp, String sourcePort, String protocolType) {
sourceReportConfigHolder.addSourceInfo(sourceIp, sourcePort, protocolType);
}
@@ -410,7 +387,6 @@ public class ConfigManager {
// get groupId <-> topic and m value.
RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
Map<String, String> groupIdToTopic = new HashMap<>();
- Map<String, String> groupIdToMValue = new HashMap<>();
// include url2token and other params
Map<String, String> mqConfig = new HashMap<>();
@@ -443,14 +419,10 @@ public class ConfigManager {
|| StringUtils.isBlank(topic.getTopic())) {
continue;
}
- if (!StringUtils.isEmpty(topic.getM())) {
- groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
- }
if (!StringUtils.isEmpty(topic.getTopic())) {
groupIdToTopic.put(topic.getInlongGroupId().trim(), topic.getTopic().trim());
}
}
- configManager.updateMxProperties(groupIdToMValue);
configManager.updateTopicProperties(groupIdToTopic);
// other params for mq
mqConfig.putAll(clusterSet.get(0).getParams());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
index 4104ce409..ee4439829 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
@@ -212,6 +212,7 @@ public abstract class PropertiesHolder extends ConfigHolder {
String filePath = getFilePath();
if (StringUtils.isBlank(filePath)) {
LOG.error("Error in writing file {} as the file path is null.", getFileName());
+ return isSuccess;
}
readWriteLock.writeLock().lock();
try {
@@ -226,7 +227,7 @@ public abstract class PropertiesHolder extends ConfigHolder {
FileUtils.copyFile(tmpNewFile, sourceFile);
tmpNewFile.delete();
isSuccess = true;
- getFileChanged().set(true);
+ setFileChanged();
} catch (Throwable ex) {
LOG.error("Error in writing file {}", getFileName(), ex);
} finally {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
index f59b651a9..ef7b33eb9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
@@ -235,6 +235,7 @@ public class GroupIdNumConfigHolder extends PropertiesHolder {
storedMap.putAll(entry.getValue());
streamIdNumMap.put(entry.getKey(), storedMap);
} else {
+ rmvKeys.clear();
newDataMap = entry.getValue();
for (Map.Entry<String, String> entry1 : newDataMap.entrySet()) {
if (!entry1.getValue().equals(storedMap.get(entry.getKey()))) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
deleted file mode 100644
index 74b81999e..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.holder;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * value is map
- */
-public class MxPropertiesHolder extends PropertiesConfigHolder {
-
- private static final Logger LOG = LoggerFactory.getLogger(MxPropertiesHolder.class);
- private final Map<String, Map<String, String>> mxPropertiesMaps =
- new HashMap<String, Map<String, String>>();
-
- public MxPropertiesHolder(String fileName) {
- super(fileName);
- }
-
- /**
- * load m from file
- */
- @Override
- protected boolean loadFromFileToHolder() {
- super.loadFromFileToHolder();
- try {
- for (Map.Entry<String, String> entry : getHolder().entrySet()) {
- mxPropertiesMaps.put(entry.getKey(), MAP_SPLITTER.split(entry.getValue()));
- }
- } catch (Exception e) {
- LOG.error("loadConfig error :", e);
- }
- return true;
- }
-
- public Map<String, Map<String, String>> getMxPropertiesMaps() {
- return mxPropertiesMaps;
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
index 2894f19bf..d2eb66563 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
@@ -105,7 +105,7 @@ public class PropertiesConfigHolder extends ConfigHolder {
FileUtils.copyFile(tmpNewFile, sourceFile);
tmpNewFile.delete();
isSuccess = true;
- getFileChanged().set(true);
+ setFileChanged();
} catch (Exception ex) {
LOG.error("error in writing file", ex);
} finally {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
index 878bd9850..50bb7b1e8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
@@ -67,19 +67,6 @@ public class ConfigMessageServlet extends HttpServlet {
return false;
}
- private boolean handleMxConfig(RequestContent requestContent) {
- Map<String, String> groupIdToMValue = new HashMap<String, String>();
- for (Map<String, String> item : requestContent.getContent()) {
- groupIdToMValue.put(item.get("inlongGroupId"), item.get("m"));
- }
- if ("add".equals(requestContent.getOperationType())) {
- return configManager.addMxProperties(groupIdToMValue);
- } else if ("delete".equals(requestContent.getOperationType())) {
- return configManager.deleteMxProperties(groupIdToMValue);
- }
- return false;
- }
-
private void responseToJson(HttpServletResponse response,
ResponseResult result) throws IOException {
response.setContentType("application/json");
@@ -97,26 +84,22 @@ public class ConfigMessageServlet extends HttpServlet {
BufferedReader reader = null;
try {
reader = req.getReader();
- boolean isSuccess = false;
RequestContent requestContent = gson.fromJson(IOUtils.toString(reader),
RequestContent.class);
if (requestContent.getRequestType() != null
&& requestContent.getOperationType() != null) {
if ("topic".equals(requestContent.getRequestType())) {
- isSuccess = handleTopicConfig(requestContent);
+ if (handleTopicConfig(requestContent)) {
+ result.setCode(DataProxyErrCode.SUCCESS.getErrCode());
+ } else {
+ result.setMessage("cannot operate config update, please check it");
+ }
} else if ("mx".equals(requestContent.getRequestType())) {
- isSuccess = handleMxConfig(requestContent);
+ result.setMessage("Unsupported operation");
}
} else {
result.setMessage("request format is not valid");
}
-
- if (isSuccess) {
- result.setCode(DataProxyErrCode.SUCCESS.getErrCode());
- } else {
- result.setMessage("cannot operate config update, please check it");
- }
-
} catch (Exception ex) {
LOG.error("error while do post", ex);
result.setMessage(ex.getMessage());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index db8220ab7..59a765533 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -127,12 +127,6 @@ public class SimpleMessageHandler implements MessageHandler {
.append(AttrConstants.BODY)
.append(" must exist and not empty!").toString());
}
- // get m attribute
- String mxValue = "m=0";
- String configedMxAttr = configManager.getMxProperties().get(groupId);
- if (StringUtils.isNotEmpty(configedMxAttr)) {
- mxValue = configedMxAttr.trim();
- }
// convert context to http request
HttpServletRequest request =
(HttpServletRequest) context.get(AttrConstants.HTTP_REQUEST);
@@ -144,7 +138,7 @@ public class SimpleMessageHandler implements MessageHandler {
strMsgCount = String.valueOf(intMsgCnt);
// build message attributes
InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
- strBuff.append(mxValue).append("&groupId=").append(groupId)
+ strBuff.append("groupId=").append(groupId)
.append("&streamId=").append(streamId)
.append("&dt=").append(strDataTime)
.append("&NodeIP=").append(strRemoteIP)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 5b2d725fe..ab1d77b53 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -376,14 +376,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
}
} else {
- // get configured m value
- Map<String, String> mxValue =
- configManager.getMxPropertiesMaps().get(groupId);
- if (mxValue != null && mxValue.size() != 0) {
- message.getAttributeMap().putAll(mxValue);
- } else {
- message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
- }
// get configured topic name
configTopic = configManager.getTopicName(groupId, streamId);
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 4666e617f..5db66d1d5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -223,13 +223,6 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
if (StringUtils.isNotEmpty(value)) {
topicInfo.set(value.trim());
}
-
- Map<String, String> mxValue = configManager.getMxPropertiesMaps().get(groupId);
- if (mxValue != null && mxValue.size() != 0) {
- message.getAttributeMap().putAll(mxValue);
- } else {
- message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
- }
} else {
String num2name = commonAttrMap.get(AttrConstants.NUM2NAME);
String groupIdNum = commonAttrMap.get(AttrConstants.GROUPID_NUM);
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/mx.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/mx.properties
deleted file mode 100644
index 7ccdf1872..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/mx.properties
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#