You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/06/01 08:06:12 UTC

[inlong] branch master updated: [INLONG-8135][DataProxy] Optimize ConfigManager implementation (Part two) (#8138)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c8c7b6a20 [INLONG-8135][DataProxy] Optimize ConfigManager implementation (Part two) (#8138)
c8c7b6a20 is described below

commit c8c7b6a20bea5e8555d7f3420cf15d2eb928fe7d
Author: Goson Zhang <46...@qq.com>
AuthorDate: Thu Jun 1 16:06:06 2023 +0800

    [INLONG-8135][DataProxy] Optimize ConfigManager implementation (Part two) (#8138)
---
 inlong-dataproxy/bin/dataproxy-start.sh            |  2 +-
 .../inlong/dataproxy/config/ConfigHolder.java      |  4 +-
 .../inlong/dataproxy/config/ConfigManager.java     | 28 -----------
 .../inlong/dataproxy/config/PropertiesHolder.java  |  3 +-
 .../config/holder/GroupIdNumConfigHolder.java      |  1 +
 .../config/holder/MxPropertiesHolder.java          | 58 ----------------------
 .../config/holder/PropertiesConfigHolder.java      |  2 +-
 .../config/remote/ConfigMessageServlet.java        | 29 +++--------
 .../dataproxy/http/SimpleMessageHandler.java       |  8 +--
 .../dataproxy/source/ServerMessageHandler.java     |  8 ---
 .../dataproxy/source/SimpleMessageHandler.java     |  7 ---
 .../src/test/resources/mx.properties               | 16 ------
 12 files changed, 14 insertions(+), 152 deletions(-)

diff --git a/inlong-dataproxy/bin/dataproxy-start.sh b/inlong-dataproxy/bin/dataproxy-start.sh
index 811915ae2..cd70b4a5f 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -32,7 +32,7 @@ error() {
   fi
 }
 
-for i in {mx.properties,weight.properties,common.properties,blacklist.properties,whitelist.properties,groupid_mapping.properties,topics.properties,mq_cluster.properties}
+for i in {weight.properties,common.properties,blacklist.properties,whitelist.properties,groupid_mapping.properties,topics.properties,mq_cluster.properties}
 do
   if [ ! -f "$i" ]; then
     touch "$i"
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
index d6e533494..e07983550 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigHolder.java
@@ -130,7 +130,7 @@ public abstract class ConfigHolder {
         }
     }
 
-    public AtomicBoolean getFileChanged() {
-        return fileChanged;
+    public void setFileChanged() {
+        fileChanged.set(true);
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 6307d2bf8..fd0cc5377 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -24,7 +24,6 @@ import org.apache.inlong.dataproxy.config.holder.BlackListConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
 import org.apache.inlong.dataproxy.config.holder.GroupIdNumConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.MQClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.MxPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.PropertiesConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.SourceReportConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.SourceReportInfo;
@@ -79,7 +78,6 @@ public class ConfigManager {
 
     private final MQClusterConfigHolder mqClusterConfigHolder = new MQClusterConfigHolder("mq_cluster.properties");
     private final PropertiesConfigHolder topicConfig = new PropertiesConfigHolder("topics.properties");
-    private final MxPropertiesHolder mxConfig = new MxPropertiesHolder("mx.properties");
     // source report configure holder
     private final SourceReportConfigHolder sourceReportConfigHolder = new SourceReportConfigHolder();
     // mq clusters ready
@@ -203,23 +201,6 @@ public class ConfigManager {
                 || whitelistConfigHolder.isIllegalIP(strRemoteIP);
     }
 
-    // get mx configure info
-    public Map<String, Map<String, String>> getMxPropertiesMaps() {
-        return mxConfig.getMxPropertiesMaps();
-    }
-
-    public Map<String, String> getMxProperties() {
-        return mxConfig.getHolder();
-    }
-
-    public boolean addMxProperties(Map<String, String> result) {
-        return updatePropertiesHolder(result, mxConfig, true);
-    }
-
-    public boolean deleteMxProperties(Map<String, String> result) {
-        return updatePropertiesHolder(result, mxConfig, false);
-    }
-
     public boolean updateTopicProperties(Map<String, String> result) {
         return updatePropertiesHolder(result, topicConfig);
     }
@@ -228,10 +209,6 @@ public class ConfigManager {
         return updatePropertiesHolder(result, mqClusterConfigHolder);
     }
 
-    public boolean updateMxProperties(Map<String, String> result) {
-        return updatePropertiesHolder(result, mxConfig);
-    }
-
     public void addSourceReportInfo(String sourceIp, String sourcePort, String protocolType) {
         sourceReportConfigHolder.addSourceInfo(sourceIp, sourcePort, protocolType);
     }
@@ -410,7 +387,6 @@ public class ConfigManager {
                 // get groupId <-> topic and m value.
                 RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
                 Map<String, String> groupIdToTopic = new HashMap<>();
-                Map<String, String> groupIdToMValue = new HashMap<>();
                 // include url2token and other params
                 Map<String, String> mqConfig = new HashMap<>();
 
@@ -443,14 +419,10 @@ public class ConfigManager {
                                 || StringUtils.isBlank(topic.getTopic())) {
                             continue;
                         }
-                        if (!StringUtils.isEmpty(topic.getM())) {
-                            groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
-                        }
                         if (!StringUtils.isEmpty(topic.getTopic())) {
                             groupIdToTopic.put(topic.getInlongGroupId().trim(), topic.getTopic().trim());
                         }
                     }
-                    configManager.updateMxProperties(groupIdToMValue);
                     configManager.updateTopicProperties(groupIdToTopic);
                     // other params for mq
                     mqConfig.putAll(clusterSet.get(0).getParams());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
index 4104ce409..ee4439829 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/PropertiesHolder.java
@@ -212,6 +212,7 @@ public abstract class PropertiesHolder extends ConfigHolder {
         String filePath = getFilePath();
         if (StringUtils.isBlank(filePath)) {
             LOG.error("Error in writing file {} as the file path is null.", getFileName());
+            return isSuccess;
         }
         readWriteLock.writeLock().lock();
         try {
@@ -226,7 +227,7 @@ public abstract class PropertiesHolder extends ConfigHolder {
             FileUtils.copyFile(tmpNewFile, sourceFile);
             tmpNewFile.delete();
             isSuccess = true;
-            getFileChanged().set(true);
+            setFileChanged();
         } catch (Throwable ex) {
             LOG.error("Error in writing file {}", getFileName(), ex);
         } finally {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
index f59b651a9..ef7b33eb9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/GroupIdNumConfigHolder.java
@@ -235,6 +235,7 @@ public class GroupIdNumConfigHolder extends PropertiesHolder {
                 storedMap.putAll(entry.getValue());
                 streamIdNumMap.put(entry.getKey(), storedMap);
             } else {
+                rmvKeys.clear();
                 newDataMap = entry.getValue();
                 for (Map.Entry<String, String> entry1 : newDataMap.entrySet()) {
                     if (!entry1.getValue().equals(storedMap.get(entry.getKey()))) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
deleted file mode 100644
index 74b81999e..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MxPropertiesHolder.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.holder;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * value is map
- */
-public class MxPropertiesHolder extends PropertiesConfigHolder {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MxPropertiesHolder.class);
-    private final Map<String, Map<String, String>> mxPropertiesMaps =
-            new HashMap<String, Map<String, String>>();
-
-    public MxPropertiesHolder(String fileName) {
-        super(fileName);
-    }
-
-    /**
-     * load m from file
-     */
-    @Override
-    protected boolean loadFromFileToHolder() {
-        super.loadFromFileToHolder();
-        try {
-            for (Map.Entry<String, String> entry : getHolder().entrySet()) {
-                mxPropertiesMaps.put(entry.getKey(), MAP_SPLITTER.split(entry.getValue()));
-            }
-        } catch (Exception e) {
-            LOG.error("loadConfig error :", e);
-        }
-        return true;
-    }
-
-    public Map<String, Map<String, String>> getMxPropertiesMaps() {
-        return mxPropertiesMaps;
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
index 2894f19bf..d2eb66563 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
@@ -105,7 +105,7 @@ public class PropertiesConfigHolder extends ConfigHolder {
             FileUtils.copyFile(tmpNewFile, sourceFile);
             tmpNewFile.delete();
             isSuccess = true;
-            getFileChanged().set(true);
+            setFileChanged();
         } catch (Exception ex) {
             LOG.error("error in writing file", ex);
         } finally {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
index 878bd9850..50bb7b1e8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
@@ -67,19 +67,6 @@ public class ConfigMessageServlet extends HttpServlet {
         return false;
     }
 
-    private boolean handleMxConfig(RequestContent requestContent) {
-        Map<String, String> groupIdToMValue = new HashMap<String, String>();
-        for (Map<String, String> item : requestContent.getContent()) {
-            groupIdToMValue.put(item.get("inlongGroupId"), item.get("m"));
-        }
-        if ("add".equals(requestContent.getOperationType())) {
-            return configManager.addMxProperties(groupIdToMValue);
-        } else if ("delete".equals(requestContent.getOperationType())) {
-            return configManager.deleteMxProperties(groupIdToMValue);
-        }
-        return false;
-    }
-
     private void responseToJson(HttpServletResponse response,
             ResponseResult result) throws IOException {
         response.setContentType("application/json");
@@ -97,26 +84,22 @@ public class ConfigMessageServlet extends HttpServlet {
         BufferedReader reader = null;
         try {
             reader = req.getReader();
-            boolean isSuccess = false;
             RequestContent requestContent = gson.fromJson(IOUtils.toString(reader),
                     RequestContent.class);
             if (requestContent.getRequestType() != null
                     && requestContent.getOperationType() != null) {
                 if ("topic".equals(requestContent.getRequestType())) {
-                    isSuccess = handleTopicConfig(requestContent);
+                    if (handleTopicConfig(requestContent)) {
+                        result.setCode(DataProxyErrCode.SUCCESS.getErrCode());
+                    } else {
+                        result.setMessage("cannot operate config update, please check it");
+                    }
                 } else if ("mx".equals(requestContent.getRequestType())) {
-                    isSuccess = handleMxConfig(requestContent);
+                    result.setMessage("Unsupported operation");
                 }
             } else {
                 result.setMessage("request format is not valid");
             }
-
-            if (isSuccess) {
-                result.setCode(DataProxyErrCode.SUCCESS.getErrCode());
-            } else {
-                result.setMessage("cannot operate config update, please check it");
-            }
-
         } catch (Exception ex) {
             LOG.error("error while do post", ex);
             result.setMessage(ex.getMessage());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index db8220ab7..59a765533 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -127,12 +127,6 @@ public class SimpleMessageHandler implements MessageHandler {
                     .append(AttrConstants.BODY)
                     .append(" must exist and not empty!").toString());
         }
-        // get m attribute
-        String mxValue = "m=0";
-        String configedMxAttr = configManager.getMxProperties().get(groupId);
-        if (StringUtils.isNotEmpty(configedMxAttr)) {
-            mxValue = configedMxAttr.trim();
-        }
         // convert context to http request
         HttpServletRequest request =
                 (HttpServletRequest) context.get(AttrConstants.HTTP_REQUEST);
@@ -144,7 +138,7 @@ public class SimpleMessageHandler implements MessageHandler {
         strMsgCount = String.valueOf(intMsgCnt);
         // build message attributes
         InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
-        strBuff.append(mxValue).append("&groupId=").append(groupId)
+        strBuff.append("groupId=").append(groupId)
                 .append("&streamId=").append(streamId)
                 .append("&dt=").append(strDataTime)
                 .append("&NodeIP=").append(strRemoteIP)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 5b2d725fe..ab1d77b53 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -376,14 +376,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                     }
                 }
             } else {
-                // get configured m value
-                Map<String, String> mxValue =
-                        configManager.getMxPropertiesMaps().get(groupId);
-                if (mxValue != null && mxValue.size() != 0) {
-                    message.getAttributeMap().putAll(mxValue);
-                } else {
-                    message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
-                }
                 // get configured topic name
                 configTopic = configManager.getTopicName(groupId, streamId);
             }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 4666e617f..5db66d1d5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -223,13 +223,6 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
             if (StringUtils.isNotEmpty(value)) {
                 topicInfo.set(value.trim());
             }
-
-            Map<String, String> mxValue = configManager.getMxPropertiesMaps().get(groupId);
-            if (mxValue != null && mxValue.size() != 0) {
-                message.getAttributeMap().putAll(mxValue);
-            } else {
-                message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
-            }
         } else {
             String num2name = commonAttrMap.get(AttrConstants.NUM2NAME);
             String groupIdNum = commonAttrMap.get(AttrConstants.GROUPID_NUM);
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/mx.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/mx.properties
deleted file mode 100644
index 7ccdf1872..000000000
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/mx.properties
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#