You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/04/30 13:16:55 UTC

[inlong] branch master updated: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism (#7942)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d0eb13483 [INLONG-7931][DataProxy] Optimize common.properties related control mechanism (#7942)
d0eb13483 is described below

commit d0eb13483eded8106883e1606e74b266cc118df4
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Apr 30 21:16:49 2023 +0800

    [INLONG-7931][DataProxy] Optimize common.properties related control mechanism (#7942)
---
 .../apache/inlong/dataproxy/config/AuthUtils.java  |  10 +-
 .../dataproxy/config/CommonConfigHolder.java       | 477 +++++++++++++++++++++
 .../inlong/dataproxy/config/ConfigManager.java     |  42 +-
 .../config/DefaultManagerIpListParser.java         |  24 +-
 .../dataproxy/config/IManagerIpListParser.java     |   4 -
 .../dataproxy/config/RemoteConfigManager.java      |  27 +-
 .../config/holder/CommonPropertiesHolder.java      | 222 ----------
 .../ClassResourceCommonPropertiesLoader.java       |  60 ---
 .../config/loader/CommonPropertiesLoader.java      |  34 --
 .../inlong/dataproxy/consts/ConfigConstants.java   |  10 -
 .../dataproxy/heartbeat/HeartbeatManager.java      |  47 +-
 .../inlong/dataproxy/http/HttpBaseSource.java      |   8 +-
 .../dataproxy/metrics/DataProxyMetricItem.java     |   4 +-
 .../dataproxy/metrics/DataProxyMetricItemSet.java  |   5 +-
 .../inlong/dataproxy/metrics/audit/AuditUtils.java |  41 +-
 .../prometheus/PrometheusMetricListener.java       |  10 +-
 .../apache/inlong/dataproxy/node/Application.java  |   8 +-
 .../inlong/dataproxy/sink/common/SinkContext.java  |  15 +-
 .../sink/mq/MessageQueueZoneSinkContext.java       |  27 +-
 .../apache/inlong/dataproxy/source/BaseSource.java |   8 +-
 .../dataproxy/source/ServerMessageHandler.java     |   6 +-
 .../inlong/dataproxy/source/SourceContext.java     |   6 +-
 .../source/tcp/InlongTcpChannelHandler.java        |  12 +-
 .../TestCommonConfigHolder.java}                   |  26 +-
 24 files changed, 601 insertions(+), 532 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/AuthUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/AuthUtils.java
index 83f4be01b..3aa5661df 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/AuthUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/AuthUtils.java
@@ -18,12 +18,9 @@
 package org.apache.inlong.dataproxy.config;
 
 import org.apache.inlong.common.util.BasicAuth;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 public class AuthUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
@@ -32,10 +29,9 @@ public class AuthUtils {
      * Generate http basic auth credential from configured secretId and secretKey
      */
     public static String genBasicAuth() {
-        Map<String, String> properties = ConfigManager.getInstance().getCommonProperties();
-        String secretId = properties.get(ConfigConstants.MANAGER_AUTH_SECRET_ID);
-        String secretKey = properties.get(ConfigConstants.MANAGER_AUTH_SECRET_KEY);
-        return BasicAuth.genBasicAuthCredential(secretId, secretKey);
+        return BasicAuth.genBasicAuthCredential(
+                CommonConfigHolder.getInstance().getManagerAuthSecretId(),
+                CommonConfigHolder.getInstance().getManagerAuthSecretKey());
     }
 
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
new file mode 100644
index 000000000..d6f3aec76
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler;
+import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * common.properties Configure Holder
+ */
+public class CommonConfigHolder {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class);
+    // configure file name
+    private static final String COMMON_CONFIG_FILE_NAME = "common.properties";
+    // **** allowed keys and default value, begin
+    // cluster tag
+    public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
+    public static final String VAL_DEF_CLUSTER_TAG = "default_cluster";
+    // cluster name
+    public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
+    public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy";
+    // cluster incharges
+    public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
+    public static final String VAL_DEF_CLUSTER_INCHARGES = "admin";
+    // cluster exttag,
+    public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag";
+    // predefined format of ext tag: {key}={value}
+    public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true";
+    // manager type
+    public static final String KEY_MANAGER_TYPE = "manager.type";
+    public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName();
+    // manager hosts
+    public static final String KEY_MANAGER_HOSTS = "manager.hosts";
+    public static final String KEY_MANAGER_HOSTS_SEPARATOR = ",";
+    // manager auth secret id
+    public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId";
+    // manager auth secret key
+    public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey";
+    // configure file check interval
+    private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval";
+    public static final long VAL_DEF_CONFIG_CHECK_INTERVAL_MS = 60000L;
+    // Whether to accept messages without mapping between groupId/streamId and topic
+    public static final String KEY_NOTFOUND_TOPIC_ACCEPT = "source.topic.notfound.accept";
+    public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false;
+    // whether enable whitelist, optional field.
+    public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist";
+    public static final boolean VAL_DEF_ENABLE_WHITELIST = false;
+    // Audit fields
+    public static final String KEY_ENABLE_AUDIT = "audit.enable";
+    public static final boolean VAL_DEF_ENABLE_AUDIT = true;
+    public static final String KEY_AUDIT_PROXYS = "audit.proxys";
+    public static final String KEY_AUDIT_FILE_PATH = "audit.filePath";
+    public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/";
+    public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows";
+    public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 2000000;
+    public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = "auditFormatInterval";
+    public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 60000L;
+    // Whether response after save msg
+    public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
+    public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false;
+    // Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
+    public static final String KEY_MAX_RAS_TIMEOUT_MS = "maxRASTimeoutMs";
+    public static final long VAL_DEF_MAX_RAS_TIMEOUT_MS = 10000L;
+    // max buffer queue size in Kb
+    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
+    public static final int VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
+    // event handler
+    public static final String KEY_EVENT_HANDLER = "eventHandler";
+    public static final String VAL_DEF_EVENT_HANDLER = DefaultEventHandler.class.getName();
+    // cache cluster selector
+    public static final String KEY_CACHE_CLUSTER_SELECTOR = "cacheClusterSelector";
+    public static final String VAL_DEF_CACHE_CLUSTER_SELECTOR = AllCacheClusterSelector.class.getName();
+    // proxy node id
+    public static final String KEY_PROXY_NODE_ID = "nodeId";
+    public static final String VAL_DEF_PROXY_NODE_ID = "127.0.0.1";
+    // msg sent compress type
+    public static final String KEY_MSG_SENT_COMPRESS_TYPE = "compressType";
+    public static final String VAL_DEF_MSG_COMPRESS_TYPE = ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name();
+    // prometheus http port
+    public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
+    public static final int VAL_DEF_PROMETHEUS_HTTP_PORT = 8080;
+    // **** allowed keys and default value, end
+
+    // class instance
+    private static CommonConfigHolder instance = null;
+    private static volatile boolean isInit = false;
+    private Map<String, String> props;
+    // pre-read field values
+    private String clusterTag = VAL_DEF_CLUSTER_TAG;
+    private String clusterName = VAL_DEF_CLUSTER_NAME;
+    private String clusterIncharges = VAL_DEF_CLUSTER_INCHARGES;
+    private String clusterExtTag = VAL_DEF_CLUSTER_EXT_TAG;
+    private String managerType = VAL_DEF_MANAGER_TYPE;
+    private IManagerIpListParser ipListParser = null;
+    private String managerAuthSecretId = "";
+    private String managerAuthSecretKey = "";
+    private long configChkInvlMs = VAL_DEF_CONFIG_CHECK_INTERVAL_MS;
+    private boolean enableAudit = VAL_DEF_ENABLE_AUDIT;
+    private final HashSet<String> auditProxys = new HashSet<>();
+    private String auditFilePath = VAL_DEF_AUDIT_FILE_PATH;
+    private int auditMaxCacheRows = VAL_DEF_AUDIT_MAX_CACHE_ROWS;
+    private long auditFormatInvlMs = VAL_DEF_AUDIT_FORMAT_INTERVAL_MS;
+    private boolean responseAfterSave = VAL_DEF_RESPONSE_AFTER_SAVE;
+    private long maxResAfterSaveTimeout = VAL_DEF_MAX_RAS_TIMEOUT_MS;
+    private boolean noTopicAccept = VAL_DEF_NOTFOUND_TOPIC_ACCEPT;
+    private boolean enableWhiteList = VAL_DEF_ENABLE_WHITELIST;
+    private int maxBufferQueueSizeKb = VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB;
+    private String eventHandler = VAL_DEF_EVENT_HANDLER;
+    private String cacheClusterSelector = VAL_DEF_CACHE_CLUSTER_SELECTOR;
+    private String proxyNodeId = VAL_DEF_PROXY_NODE_ID;
+    private String msgCompressType = VAL_DEF_MSG_COMPRESS_TYPE;
+    private int prometheusHttpPort = VAL_DEF_PROMETHEUS_HTTP_PORT;
+
+    /**
+     * get instance for common.properties config manager
+     */
+    public static CommonConfigHolder getInstance() {
+        if (isInit && instance != null) {
+            return instance;
+        }
+        synchronized (CommonConfigHolder.class) {
+            if (!isInit) {
+                instance = new CommonConfigHolder();
+                if (instance.loadConfigFile()) {
+                    instance.preReadFields();
+                }
+                isInit = true;
+            }
+        }
+        return instance;
+    }
+
+    /**
+     * Get the original attribute map
+     *
+     * Notice: only the non-pre-read fields need to be searched from the attribute map,
+     *         the pre-read fields MUST be got according to the methods in the class.
+     */
+    public Map<String, String> getProperties() {
+        return this.props;
+    }
+
+    /**
+     * getStringFromContext
+     *
+     * @param context
+     * @param key
+     * @param defaultValue
+     * @return
+     */
+    public static String getStringFromContext(Context context, String key, String defaultValue) {
+        String value = context.getString(key);
+        value = (value != null) ? value : getInstance().getProperties().getOrDefault(key, defaultValue);
+        return value;
+    }
+
+    public String getClusterTag() {
+        return clusterTag;
+    }
+
+    public String getClusterName() {
+        return this.clusterName;
+    }
+
+    public String getClusterIncharges() {
+        return clusterIncharges;
+    }
+
+    public String getClusterExtTag() {
+        return clusterExtTag;
+    }
+
+    public long getConfigChkInvlMs() {
+        return configChkInvlMs;
+    }
+
+    public boolean isNoTopicAccept() {
+        return noTopicAccept;
+    }
+
+    public boolean isEnableWhiteList() {
+        return this.enableWhiteList;
+    }
+
+    public String getManagerType() {
+        return managerType;
+    }
+
+    public List<String> getManagerHosts() {
+        return this.ipListParser.getIpList();
+    }
+
+    public String getManagerAuthSecretId() {
+        return managerAuthSecretId;
+    }
+
+    public String getManagerAuthSecretKey() {
+        return managerAuthSecretKey;
+    }
+
+    public boolean isEnableAudit() {
+        return enableAudit;
+    }
+
+    public HashSet<String> getAuditProxys() {
+        return auditProxys;
+    }
+
+    public String getAuditFilePath() {
+        return auditFilePath;
+    }
+
+    public int getAuditMaxCacheRows() {
+        return auditMaxCacheRows;
+    }
+
+    public long getAuditFormatInvlMs() {
+        return auditFormatInvlMs;
+    }
+
+    public boolean isResponseAfterSave() {
+        return responseAfterSave;
+    }
+
+    public long getMaxResAfterSaveTimeout() {
+        return maxResAfterSaveTimeout;
+    }
+
+    public int getMaxBufferQueueSizeKb() {
+        return maxBufferQueueSizeKb;
+    }
+
+    public String getEventHandler() {
+        return eventHandler;
+    }
+
+    public String getCacheClusterSelector() {
+        return cacheClusterSelector;
+    }
+
+    public int getPrometheusHttpPort() {
+        return prometheusHttpPort;
+    }
+
+    public String getProxyNodeId() {
+        return proxyNodeId;
+    }
+
+    public String getMsgCompressType() {
+        return msgCompressType;
+    }
+
+    private void preReadFields() {
+        String tmpValue;
+        // read cluster tag
+        tmpValue = this.props.get(KEY_PROXY_CLUSTER_TAG);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.clusterTag = tmpValue.trim();
+        }
+        // read cluster name
+        tmpValue = this.props.get(KEY_PROXY_CLUSTER_NAME);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.clusterName = tmpValue.trim();
+        }
+        // read cluster incharges
+        tmpValue = this.props.get(KEY_PROXY_CLUSTER_INCHARGES);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.clusterIncharges = tmpValue.trim();
+        }
+        tmpValue = this.props.get(KEY_PROXY_CLUSTER_EXT_TAG);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.clusterExtTag = tmpValue.trim();
+        }
+        // read configure check interval
+        tmpValue = this.props.get(KEY_CONFIG_CHECK_INTERVAL_MS);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.configChkInvlMs = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_CONFIG_CHECK_INTERVAL_MS);
+        }
+        // read whether accept msg without topic
+        tmpValue = this.props.get(KEY_NOTFOUND_TOPIC_ACCEPT);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.noTopicAccept = "TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read enable whitelist
+        tmpValue = this.props.get(KEY_ENABLE_WHITELIST);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.enableWhiteList = "TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read manager type
+        tmpValue = this.props.get(KEY_MANAGER_TYPE);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.managerType = tmpValue.trim();
+        }
+        // read manager auth secret id
+        tmpValue = this.props.get(KEY_MANAGER_AUTH_SECRET_ID);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.managerAuthSecretId = tmpValue.trim();
+        }
+        // read manager auth secret key
+        tmpValue = this.props.get(KEY_MANAGER_AUTH_SECRET_KEY);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.managerAuthSecretKey = tmpValue.trim();
+        }
+        // read whether enable audit
+        tmpValue = this.props.get(KEY_ENABLE_AUDIT);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.enableAudit = "TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read audit proxys
+        tmpValue = this.props.get(KEY_AUDIT_PROXYS);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            String[] ipPorts = tmpValue.split("\\s+");
+            for (String tmpIPPort : ipPorts) {
+                if (StringUtils.isBlank(tmpIPPort)) {
+                    continue;
+                }
+                this.auditProxys.add(tmpIPPort.trim());
+            }
+        }
+        // read audit file path
+        tmpValue = this.props.get(KEY_AUDIT_FILE_PATH);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.auditFilePath = tmpValue.trim();
+        }
+        // read audit max cache rows
+        tmpValue = this.props.get(KEY_AUDIT_MAX_CACHE_ROWS);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.auditMaxCacheRows = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_AUDIT_MAX_CACHE_ROWS);
+        }
+        // read audit format interval
+        tmpValue = this.props.get(KEY_AUDIT_FORMAT_INTERVAL_MS);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.auditFormatInvlMs = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_AUDIT_FORMAT_INTERVAL_MS);
+        }
+        // read whether response after save
+        tmpValue = this.props.get(KEY_RESPONSE_AFTER_SAVE);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.responseAfterSave = "TRUE".equalsIgnoreCase(tmpValue.trim());
+        }
+        // read max response after save timeout
+        tmpValue = this.props.get(KEY_MAX_RAS_TIMEOUT_MS);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.maxResAfterSaveTimeout = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_MAX_RAS_TIMEOUT_MS);
+        }
+        // read max bufferqueue size
+        tmpValue = this.props.get(KEY_MAX_BUFFERQUEUE_SIZE_KB);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.maxBufferQueueSizeKb = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB);
+        }
+        // read event handler
+        tmpValue = this.props.get(KEY_EVENT_HANDLER);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.eventHandler = tmpValue.trim();
+        }
+        // read cache cluster selector
+        tmpValue = this.props.get(KEY_CACHE_CLUSTER_SELECTOR);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.cacheClusterSelector = tmpValue.trim();
+        }
+        // read proxy node id
+        tmpValue = this.props.get(KEY_PROXY_NODE_ID);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.proxyNodeId = tmpValue.trim();
+        }
+        // read msg compress type
+        tmpValue = this.props.get(KEY_MSG_SENT_COMPRESS_TYPE);
+        if (StringUtils.isNotBlank(tmpValue)) {
+            this.msgCompressType = tmpValue.trim();
+        }
+        // read prometheus Http Port
+        tmpValue = this.props.get(KEY_PROMETHEUS_HTTP_PORT);
+        if (StringUtils.isNotEmpty(tmpValue)) {
+            this.prometheusHttpPort = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_PROMETHEUS_HTTP_PORT);
+        }
+        // initial ip parser
+        try {
+            Class<? extends IManagerIpListParser> ipListParserClass =
+                    (Class<? extends IManagerIpListParser>) Class.forName(this.managerType);
+            this.ipListParser = ipListParserClass.getDeclaredConstructor().newInstance();
+            this.ipListParser.setCommonProperties(this.props);
+        } catch (Throwable t) {
+            LOG.error("Initial ipListParser Class {} failure, exit!", this.managerType, t);
+            System.exit(6);
+        }
+    }
+
+    private void chkRequiredFields(String requiredFieldKey) {
+        String fieldVal = props.get(requiredFieldKey);
+        if (fieldVal == null) {
+            LOG.error("Missing mandatory field {} in {}, exit!",
+                    requiredFieldKey, COMMON_CONFIG_FILE_NAME);
+            System.exit(4);
+        }
+        if (StringUtils.isBlank(fieldVal)) {
+            LOG.error("Required {} field value is blank in {}, exit!",
+                    requiredFieldKey, COMMON_CONFIG_FILE_NAME);
+            System.exit(5);
+        }
+    }
+
+    private boolean loadConfigFile() {
+        InputStream inStream = null;
+        try {
+            URL url = getClass().getClassLoader().getResource(COMMON_CONFIG_FILE_NAME);
+            inStream = url != null ? url.openStream() : null;
+            if (inStream == null) {
+                LOG.error("Fail to open {} as the input stream is null, exit!",
+                        COMMON_CONFIG_FILE_NAME);
+                System.exit(1);
+                return false;
+            }
+            String strKey;
+            String strVal;
+            Properties tmpProps = new Properties();
+            tmpProps.load(inStream);
+            props = new HashMap<>(tmpProps.size());
+            for (Map.Entry<Object, Object> entry : tmpProps.entrySet()) {
+                if (entry == null || entry.getKey() == null || entry.getValue() == null) {
+                    continue;
+                }
+                strKey = (String) entry.getKey();
+                strVal = (String) entry.getValue();
+                if (StringUtils.isBlank(strKey) || StringUtils.isBlank(strVal)) {
+                    continue;
+                }
+                props.put(strKey.trim(), strVal.trim());
+            }
+            LOG.info("Read success from {}, content is {}", COMMON_CONFIG_FILE_NAME, props);
+        } catch (Throwable e) {
+            LOG.error("Fail to load properties from {}, exit!",
+                    COMMON_CONFIG_FILE_NAME, e);
+            System.exit(2);
+            return false;
+        } finally {
+            if (null != inStream) {
+                try {
+                    inStream.close();
+                } catch (IOException e) {
+                    LOG.error("Fail to InputStream.close() for file {}, exit!",
+                            COMMON_CONFIG_FILE_NAME, e);
+                    System.exit(3);
+                }
+            }
+        }
+        return true;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 99390d659..c2fac48ce 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -54,8 +54,6 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_INTERVAL;
-
 /**
  * Config manager class.
  */
@@ -66,9 +64,7 @@ public class ConfigManager {
     public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<>();
     private static volatile boolean isInit = false;
     private static ConfigManager instance = null;
-    private static volatile boolean enableWhitList = false;
 
-    private final PropertiesConfigHolder commonConfig = new PropertiesConfigHolder("common.properties");
     private final MQClusterConfigHolder mqClusterConfigHolder = new MQClusterConfigHolder("mq_cluster.properties");
     private final PropertiesConfigHolder topicConfig = new PropertiesConfigHolder("topics.properties");
     private final MxPropertiesHolder mxConfig = new MxPropertiesHolder("mx.properties");
@@ -95,11 +91,6 @@ public class ConfigManager {
                 for (ConfigHolder holder : CONFIG_HOLDER_LIST) {
                     holder.loadFromFileToHolder();
                 }
-                // get enable whitelist status
-                String strEnable =
-                        instance.getCommonProperties().get(ConfigConstants.ENABLE_WHITELIST);
-                enableWhitList = StringUtils.isNotBlank(strEnable)
-                        && "TRUE".equalsIgnoreCase(strEnable);
                 ReloadConfigWorker reloadProperties = ReloadConfigWorker.create(instance);
                 reloadProperties.setDaemon(true);
                 reloadProperties.start();
@@ -135,13 +126,14 @@ public class ConfigManager {
     }
 
     public boolean needChkIllegalIP() {
-        return (!blackListConfig.isEmptyConfig() || enableWhitList);
+        return (!blackListConfig.isEmptyConfig()
+                || CommonConfigHolder.getInstance().isEnableWhiteList());
     }
 
     public boolean isIllegalIP(String strRemoteIP) {
         return strRemoteIP == null
                 || blackListConfig.isContain(strRemoteIP)
-                || (enableWhitList && !whiteListConfig.isContain(strRemoteIP));
+                || (CommonConfigHolder.getInstance().isEnableWhiteList() && !whiteListConfig.isContain(strRemoteIP));
     }
 
     public boolean addMxProperties(Map<String, String> result) {
@@ -266,10 +258,6 @@ public class ConfigManager {
         return groupIdConfig.getGroupIdEnableMappingProperties();
     }
 
-    public Map<String, String> getCommonProperties() {
-        return commonConfig.getHolder();
-    }
-
     public PropertiesConfigHolder getTopicConfig() {
         return topicConfig;
     }
@@ -320,16 +308,7 @@ public class ConfigManager {
         }
 
         private long getSleepTime() {
-            String sleepTimeInMsStr = configManager.getCommonProperties().get(CONFIG_CHECK_INTERVAL);
-            long sleepTimeInMs = 10000;
-            try {
-                if (sleepTimeInMsStr != null) {
-                    sleepTimeInMs = Long.parseLong(sleepTimeInMsStr);
-                }
-            } catch (Exception e) {
-                LOG.info("ignored exception ", e);
-            }
-            return sleepTimeInMs + getRandom(0, 5000);
+            return CommonConfigHolder.getInstance().getConfigChkInvlMs() + getRandom(0, 5000);
         }
 
         public void close() {
@@ -431,15 +410,14 @@ public class ConfigManager {
 
         private void checkRemoteConfig() {
             try {
-                String managerHosts = configManager.getCommonProperties().get(ConfigConstants.MANAGER_HOST);
-                String proxyClusterName = configManager.getCommonProperties().get(ConfigConstants.PROXY_CLUSTER_NAME);
-                String proxyClusterTag = configManager.getCommonProperties().get(ConfigConstants.PROXY_CLUSTER_TAG);
-                if (StringUtils.isEmpty(managerHosts) || StringUtils.isEmpty(proxyClusterName)) {
+                List<String> mgrHostPorts = CommonConfigHolder.getInstance().getManagerHosts();
+                String proxyClusterName = CommonConfigHolder.getInstance().getClusterName();
+                String proxyClusterTag = CommonConfigHolder.getInstance().getClusterTag();
+                if (mgrHostPorts.isEmpty() || StringUtils.isEmpty(proxyClusterName)) {
                     return;
                 }
-                String[] hostList = StringUtils.split(managerHosts, ",");
-                for (String host : hostList) {
-                    if (checkWithManager(host, proxyClusterName, proxyClusterTag)) {
+                for (String hostPort : mgrHostPorts) {
+                    if (checkWithManager(hostPort, proxyClusterName, proxyClusterTag)) {
                         break;
                     }
                 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/DefaultManagerIpListParser.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/DefaultManagerIpListParser.java
index 684e11f58..e6ccb72aa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/DefaultManagerIpListParser.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/DefaultManagerIpListParser.java
@@ -17,10 +17,9 @@
 
 package org.apache.inlong.dataproxy.config;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.lang3.StringUtils;
 
 /**
@@ -29,7 +28,7 @@ import org.apache.commons.lang3.StringUtils;
  */
 public class DefaultManagerIpListParser implements IManagerIpListParser {
 
-    private Map<String, String> commonProperties;
+    List<String> managerIpList = new ArrayList<>();
 
     /**
      * setCommonProperties
@@ -38,19 +37,28 @@ public class DefaultManagerIpListParser implements IManagerIpListParser {
      */
     @Override
     public void setCommonProperties(Map<String, String> commonProperties) {
-        this.commonProperties = commonProperties;
+        String managerHosts = commonProperties.get(CommonConfigHolder.KEY_MANAGER_HOSTS);
+        if (StringUtils.isBlank(managerHosts)) {
+            return;
+        }
+        String[] hostPortList = StringUtils.split(managerHosts,
+                CommonConfigHolder.KEY_MANAGER_HOSTS_SEPARATOR);
+        for (String hostport : hostPortList) {
+            if (StringUtils.isBlank(hostport)) {
+                continue;
+            }
+            managerIpList.add(hostport.trim());
+        }
     }
 
     /**
      * getIpList
      * 
-     * @return
+     * @return manager ip-port list
      */
     @Override
     public List<String> getIpList() {
-        String managerHosts = this.commonProperties.get(KEY_MANAGER_HOSTS);
-        String[] hostList = StringUtils.split(managerHosts, SEPARATOR);
-        return Arrays.asList(hostList);
+        return managerIpList;
     }
 
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/IManagerIpListParser.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/IManagerIpListParser.java
index ad8371d1c..8a5c88290 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/IManagerIpListParser.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/IManagerIpListParser.java
@@ -25,10 +25,6 @@ import java.util.Map;
  */
 public interface IManagerIpListParser {
 
-    String KEY_MANAGER_HOSTS = "manager.hosts";
-    String SEPARATOR = ",";
-    String KEY_MANAGER_TYPE = "manager.type";
-
     void setCommonProperties(Map<String, String> commonProperties);
 
     /**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index 757f278fa..1013250c3 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -19,7 +19,6 @@ package org.apache.inlong.dataproxy.config;
 
 import com.google.gson.Gson;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -40,7 +39,6 @@ import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
 import org.apache.inlong.common.pojo.dataproxy.ProxySink;
 import org.apache.inlong.common.pojo.dataproxy.ProxySource;
 import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.utils.HttpUtils;
 import org.slf4j.Logger;
@@ -63,10 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class RemoteConfigManager implements IRepository {
 
-    public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
-    private static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
     private static final char FLUME_SEPARATOR = '.';
-    private static final String KEY_CONFIG_CHECK_INTERVAL = "configCheckInterval";
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RemoteConfigManager.class);
     private static final Gson GSON = new Gson();
@@ -80,7 +75,6 @@ public class RemoteConfigManager implements IRepository {
     private long reloadInterval;
     private Timer reloadTimer;
 
-    private IManagerIpListParser ipListParser;
     private CloseableHttpClient httpClient;
 
     // flume properties
@@ -106,15 +100,7 @@ public class RemoteConfigManager implements IRepository {
             if (!isInit) {
                 instance = new RemoteConfigManager();
                 try {
-                    String strReloadInterval = CommonPropertiesHolder.getString(KEY_CONFIG_CHECK_INTERVAL);
-                    instance.reloadInterval = NumberUtils.toLong(strReloadInterval, DEFAULT_HEARTBEAT_INTERVAL_MS);
-
-                    String ipListParserType = CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE,
-                            DefaultManagerIpListParser.class.getName());
-                    Class<? extends IManagerIpListParser> ipListParserClass;
-                    ipListParserClass = (Class<? extends IManagerIpListParser>) Class
-                            .forName(ipListParserType);
-                    instance.ipListParser = ipListParserClass.getDeclaredConstructor().newInstance();
+                    instance.reloadInterval = CommonConfigHolder.getInstance().getConfigChkInvlMs();
 
                     SecureRandom random = new SecureRandom(String.valueOf(System.currentTimeMillis()).getBytes());
                     instance.managerIpListIndex.set(random.nextInt());
@@ -150,14 +136,13 @@ public class RemoteConfigManager implements IRepository {
      */
     public void reload() {
         LOGGER.info("start to reload config");
-        String proxyClusterName = CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
-        String proxyClusterTag = CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_TAG);
+        String proxyClusterName = CommonConfigHolder.getInstance().getClusterName();
+        String proxyClusterTag = CommonConfigHolder.getInstance().getClusterTag();
         if (StringUtils.isBlank(proxyClusterName) || StringUtils.isBlank(proxyClusterTag)) {
             return;
         }
 
-        this.ipListParser.setCommonProperties(CommonPropertiesHolder.get());
-        List<String> managerIpList = this.ipListParser.getIpList();
+        List<String> managerIpList = CommonConfigHolder.getInstance().getManagerHosts();
         if (managerIpList == null || managerIpList.size() == 0) {
             return;
         }
@@ -263,7 +248,7 @@ public class RemoteConfigManager implements IRepository {
         if (currentClusterConfig != null) {
             return currentClusterConfig.getProxyCluster().getName();
         }
-        return CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
+        return CommonConfigHolder.getInstance().getClusterName();
     }
 
     /**
@@ -274,7 +259,7 @@ public class RemoteConfigManager implements IRepository {
         if (currentClusterConfig != null) {
             return currentClusterConfig.getProxyCluster().getSetName();
         }
-        return CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_TAG);
+        return CommonConfigHolder.getInstance().getClusterTag();
     }
 
     /**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
deleted file mode 100644
index 1d4fe3990..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.holder;
-
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.loader.ClassResourceCommonPropertiesLoader;
-import org.apache.inlong.dataproxy.config.loader.CommonPropertiesLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * CommonPropertiesHolder
- */
-public class CommonPropertiesHolder {
-
-    public static final Logger LOG = LoggerFactory.getLogger(CommonPropertiesHolder.class);
-    public static final String KEY_COMMON_PROPERTIES = "common-properties-loader";
-    public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
-    public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
-    public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
-    public static final boolean DEFAULT_RESPONSE_AFTER_SAVE = false;
-    public static final String KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
-    public static final long DEFAULT_MAX_RESPONSE_TIMEOUT_MS = 10000L;
-
-    private static Map<String, String> props;
-
-    private static long auditFormatInterval = 60000L;
-    private static boolean isResponseAfterSave = DEFAULT_RESPONSE_AFTER_SAVE;
-    private static long maxResponseTimeout = DEFAULT_MAX_RESPONSE_TIMEOUT_MS;
-
-    /**
-     * init
-     */
-    private static void init() {
-        synchronized (KEY_COMMON_PROPERTIES) {
-            if (props == null) {
-                props = new ConcurrentHashMap<>();
-                String loaderClassName = System.getenv(KEY_COMMON_PROPERTIES);
-                loaderClassName = (loaderClassName == null) ? DEFAULT_LOADER : loaderClassName;
-                try {
-                    Class<?> loaderClass = ClassUtils.getClass(loaderClassName);
-                    Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
-                    if (loaderObject instanceof CommonPropertiesLoader) {
-                        CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
-                        props.putAll(loader.load());
-                        LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
-                        auditFormatInterval = NumberUtils
-                                .toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
-                        isResponseAfterSave = BooleanUtils
-                                .toBoolean(CommonPropertiesHolder.getString(KEY_RESPONSE_AFTER_SAVE));
-                        maxResponseTimeout = CommonPropertiesHolder.getLong(KEY_MAX_RESPONSE_TIMEOUT_MS,
-                                DEFAULT_MAX_RESPONSE_TIMEOUT_MS);
-                    }
-                } catch (Throwable t) {
-                    LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
-                            loaderClassName, t.getMessage());
-                    LOG.error(t.getMessage(), t);
-                }
-
-            }
-        }
-    }
-
-    /**
-     * get props
-     *
-     * @return the props
-     */
-    public static Map<String, String> get() {
-        synchronized (KEY_COMMON_PROPERTIES) {
-            if (props != null) {
-                return props;
-            }
-        }
-        init();
-        return props;
-    }
-
-    /**
-     * Gets value mapped to key, returning defaultValue if unmapped.
-     *
-     * @param key to be found
-     * @param defaultValue returned if key is unmapped
-     * @return value associated with key
-     */
-    public static String getString(String key, String defaultValue) {
-        return get().getOrDefault(key, defaultValue);
-    }
-
-    /**
-     * Gets value mapped to key, returning null if unmapped.
-     *
-     * @param key to be found
-     * @return value associated with key or null if unmapped
-     */
-    public static String getString(String key) {
-        return get().get(key);
-    }
-
-    /**
-     * getStringFromContext
-     *
-     * @param context
-     * @param key
-     * @param defaultValue
-     * @return
-     */
-    public static String getStringFromContext(Context context, String key, String defaultValue) {
-        String value = context.getString(key);
-        value = (value != null) ? value : props.getOrDefault(key, defaultValue);
-        return value;
-    }
-
-    /**
-     * Gets value mapped to key, returning defaultValue if unmapped.
-     *
-     * @param key to be found
-     * @param defaultValue returned if key is unmapped
-     * @return value associated with key
-     */
-    public static Integer getInteger(String key, Integer defaultValue) {
-        String value = get().get(key);
-        if (value != null) {
-            return Integer.valueOf(Integer.parseInt(value.trim()));
-        }
-        return defaultValue;
-    }
-
-    /**
-     * Gets value mapped to key, returning null if unmapped.
-     * <p>
-     * Note that this method returns an object as opposed to a primitive. The configuration key requested may not be
-     * mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
-     * return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
-     * </p>
-     *
-     * @param key to be found
-     * @return value associated with key or null if unmapped
-     */
-    public static Integer getInteger(String key) {
-        return getInteger(key, null);
-    }
-
-    /**
-     * Gets value mapped to key, returning defaultValue if unmapped.
-     *
-     * @param key to be found
-     * @param defaultValue returned if key is unmapped
-     * @return value associated with key
-     */
-    public static Long getLong(String key, Long defaultValue) {
-        String value = get().get(key);
-        if (value != null) {
-            return Long.valueOf(Long.parseLong(value.trim()));
-        }
-        return defaultValue;
-    }
-
-    /**
-     * Gets value mapped to key, returning null if unmapped.
-     * <p>
-     * Note that this method returns an object as opposed to a primitive. The configuration key requested may not be
-     * mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
-     * return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
-     * </p>
-     *
-     * @param key to be found
-     * @return value associated with key or null if unmapped
-     */
-    public static Long getLong(String key) {
-        return getLong(key, null);
-    }
-
-    /**
-     * getAuditFormatInterval
-     *
-     * @return
-     */
-    public static long getAuditFormatInterval() {
-        return auditFormatInterval;
-    }
-
-    /**
-     * isResponseAfterSave
-     *
-     * @return
-     */
-    public static boolean isResponseAfterSave() {
-        return isResponseAfterSave;
-    }
-
-    /**
-     * get maxResponseTimeout
-     *
-     * @return the maxResponseTimeout
-     */
-    public static long getMaxResponseTimeout() {
-        return maxResponseTimeout;
-    }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
deleted file mode 100644
index 2e2c7c63a..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.loader;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Class resource common properties loader
- */
-public class ClassResourceCommonPropertiesLoader implements CommonPropertiesLoader {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
-    private static final String FILE_NAME = "common.properties";
-
-    /**
-     * load properties
-     */
-    @Override
-    public Map<String, String> load() {
-        return this.loadProperties();
-    }
-
-    protected Map<String, String> loadProperties() {
-        Map<String, String> result = new ConcurrentHashMap<>();
-        URL resource = getClass().getClassLoader().getResource(FILE_NAME);
-        try (InputStream inStream = Objects.requireNonNull(resource).openStream()) {
-            Properties props = new Properties();
-            props.load(inStream);
-            for (Map.Entry<Object, Object> entry : props.entrySet()) {
-                result.put((String) entry.getKey(), (String) entry.getValue());
-            }
-        } catch (Exception e) {
-            LOG.error("fail to load properties from file ={}", FILE_NAME, e);
-        }
-        return result;
-    }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
deleted file mode 100644
index 92d8f7061..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.loader;
-
-import java.util.Map;
-
-/**
- * Interface of common properties loader
- */
-public interface CommonPropertiesLoader {
-
-    /**
-     * load
-     *
-     * @return the configuration map
-     */
-    Map<String, String> load();
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 33793a4ff..0dd453e3e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -95,16 +95,6 @@ public class ConfigConstants {
     public static final String L5_ID_KEY = "l5id";
     public static final String SET_KEY = "set";
     public static final String CLUSTER_ID_KEY = "clusterId";
-    public static final String MANAGER_HOST = "manager.hosts";
-    public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name";
-    public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy";
-    public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
-    public static final String PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag";
-    public static final String PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
-    public static final String CONFIG_CHECK_INTERVAL = "configCheckInterval";
-    public static final String SOURCE_NO_TOPIC_ACCEPT = "source.topic.notfound.accept";
-    public static final String SINK_NO_TOPIC_RESEND = "sink.topic.notfound.resend";
-    public static final String ENABLE_WHITELIST = "proxy.enable.whitelist";
 
     public static final String DECODER_BODY = "body";
     public static final String DECODER_TOPICKEY = "topic_key";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index 72f753680..f45048bf1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -36,6 +36,7 @@ import org.apache.inlong.common.heartbeat.GroupHeartbeat;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
 import org.apache.inlong.common.heartbeat.StreamHeartbeat;
 import org.apache.inlong.dataproxy.config.AuthUtils;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.SourceReportInfo;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -54,12 +55,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 @Slf4j
 public class HeartbeatManager implements AbstractHeartbeatManager {
 
-    public static final String DEFAULT_CLUSTER_TAG = "default_cluster";
-    public static final String DEFAULT_CLUSTER_NAME = "default_dataproxy";
-    public static final String DEFAULT_CLUSTER_INCHARGES = "admin";
-    // predefined format of ext tag: {key}={value}
-    public static final String DEFAULT_CLUSTER_EXT_TAG = "default=true";
-
     private final CloseableHttpClient httpClient;
     private final Gson gson;
 
@@ -88,10 +83,20 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         if (null == heartbeat) {
             return;
         }
-        ConfigManager configManager = ConfigManager.getInstance();
-        final String managerHost = configManager.getCommonProperties().get(ConfigConstants.MANAGER_HOST);
+        List<String> mgrHostPorts = CommonConfigHolder.getInstance().getManagerHosts();
+        if (mgrHostPorts.isEmpty()) {
+            return;
+        }
+        for (String managerHost : mgrHostPorts) {
+            if (sendHeartBeatMsg(managerHost, heartbeat)) {
+                return;
+            }
+        }
+    }
+
+    private boolean sendHeartBeatMsg(String mgrHostPort, HeartbeatMsg heartbeat) {
         final String url =
-                "http://" + managerHost + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_HEARTBEAT_REPORT;
+                "http://" + mgrHostPort + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_HEARTBEAT_REPORT;
         try {
             HttpPost post = new HttpPost(url);
             post.addHeader(HttpHeaders.CONNECTION, "close");
@@ -102,15 +107,18 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
             post.setEntity(stringEntity);
             CloseableHttpResponse response = httpClient.execute(post);
             String isSuccess = EntityUtils.toString(response.getEntity());
-            if (StringUtils.isNotEmpty(isSuccess)
-                    && response.getStatusLine().getStatusCode() == 200) {
-                if (log.isDebugEnabled()) {
-                    log.debug("reportHeartbeat url {}, heartbeat: {}, return str {}", url, body, isSuccess);
+            if (response.getStatusLine().getStatusCode() == 200) {
+                if (StringUtils.isNotEmpty(isSuccess)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("reportHeartbeat url {}, heartbeat: {}, return str {}", url, body, isSuccess);
+                    }
                 }
+                return true;
             }
         } catch (Exception ex) {
             log.error("reportHeartbeat failed for url {}", url, ex);
         }
+        return false;
     }
 
     private synchronized CloseableHttpClient constructHttpClient() {
@@ -139,15 +147,10 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getType());
         heartbeatMsg.setReportTime(System.currentTimeMillis());
         heartbeatMsg.setLoad(0xffff);
-        Map<String, String> commonProperties = configManager.getCommonProperties();
-        heartbeatMsg.setClusterTag(commonProperties.getOrDefault(
-                ConfigConstants.PROXY_CLUSTER_TAG, DEFAULT_CLUSTER_TAG));
-        heartbeatMsg.setClusterName(commonProperties.getOrDefault(
-                ConfigConstants.PROXY_CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
-        heartbeatMsg.setInCharges(commonProperties.getOrDefault(
-                ConfigConstants.PROXY_CLUSTER_INCHARGES, DEFAULT_CLUSTER_INCHARGES));
-        heartbeatMsg.setExtTag(commonProperties.getOrDefault(
-                ConfigConstants.PROXY_CLUSTER_EXT_TAG, DEFAULT_CLUSTER_EXT_TAG));
+        heartbeatMsg.setClusterTag(CommonConfigHolder.getInstance().getClusterTag());
+        heartbeatMsg.setClusterName(CommonConfigHolder.getInstance().getClusterName());
+        heartbeatMsg.setInCharges(CommonConfigHolder.getInstance().getClusterIncharges());
+        heartbeatMsg.setExtTag(CommonConfigHolder.getInstance().getClusterExtTag());
 
         Map<String, String> groupIdMappings = configManager.getGroupIdMappingProperties();
         Map<String, Map<String, String>> streamIdMappings = configManager.getStreamIdMappingProperties();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
index 7faa75d64..70267cd59 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
@@ -28,7 +28,7 @@ import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
-import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.utils.ConfStringUtils;
@@ -77,11 +77,7 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource,
                     statIntervalSec, maxMonitorCnt);
         }
         // register metrics
-        ConfigManager configManager = ConfigManager.getInstance();
-        String clusterId =
-                configManager.getCommonProperties().getOrDefault(
-                        ConfigConstants.PROXY_CLUSTER_NAME,
-                        ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+        String clusterId = CommonConfigHolder.getInstance().getClusterName();
         this.metricItemSet =
                 new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port));
         MetricRegister.register(metricItemSet);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
index 7187ad760..644aef6ad 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
@@ -26,7 +26,7 @@ import org.apache.inlong.common.metric.Dimension;
 import org.apache.inlong.common.metric.MetricDomain;
 import org.apache.inlong.common.metric.MetricItem;
 import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.Constants;
 
@@ -144,7 +144,7 @@ public class DataProxyMetricItem extends MetricItem {
      */
     public static void fillAuditFormatTime(Event event, Map<String, String> dimensions) {
         long msgTime = (event != null) ? AuditUtils.getLogTime(event) : System.currentTimeMillis();
-        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        long auditFormatTime = msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
         dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index 9225760f0..6a3152a99 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -24,7 +24,7 @@ import org.apache.flume.Event;
 import org.apache.inlong.common.metric.MetricDomain;
 import org.apache.inlong.common.metric.MetricItemSet;
 import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 
 /**
@@ -116,7 +116,8 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
                 event.getHeaders().get(AttributeConstants.DATA_TIME));
         long msgCount = NumberUtils.toLong(
                 event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
-        long auditFormatTime = dataTime - dataTime % CommonPropertiesHolder.getAuditFormatInterval();
+        long auditFormatTime =
+                dataTime - dataTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
         dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         if (isSource) {
             dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, name);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 664c0340d..703a466d7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -17,21 +17,16 @@
 
 package org.apache.inlong.dataproxy.metrics.audit;
 
-import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Event;
 import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.audit.util.AuditConfig;
 import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.utils.Constants;
 import org.apache.inlong.dataproxy.utils.InLongMsgVer;
-
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
 
 /**
@@ -39,39 +34,21 @@ import java.util.Map;
  */
 public class AuditUtils {
 
-    public static final String AUDIT_KEY_FILE_PATH = "audit.filePath";
-    public static final String AUDIT_DEFAULT_FILE_PATH = "/data/inlong/audit/";
-    public static final String AUDIT_KEY_MAX_CACHE_ROWS = "audit.maxCacheRows";
-    public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
-    public static final String AUDIT_KEY_PROXYS = "audit.proxys";
-    public static final String AUDIT_KEY_IS_AUDIT = "audit.enable";
-
     public static final int AUDIT_ID_DATAPROXY_READ_SUCCESS = 5;
     public static final int AUDIT_ID_DATAPROXY_SEND_SUCCESS = 6;
 
-    private static boolean IS_AUDIT = true;
-
     /**
      * Init audit
      */
     public static void initAudit() {
-        // IS_AUDIT
-        IS_AUDIT = BooleanUtils.toBoolean(CommonPropertiesHolder.getString(AUDIT_KEY_IS_AUDIT));
-        if (IS_AUDIT) {
+        if (CommonConfigHolder.getInstance().isEnableAudit()) {
             // AuditProxy
-            String strIpPorts = CommonPropertiesHolder.getString(AUDIT_KEY_PROXYS);
-            HashSet<String> proxys = new HashSet<>();
-            if (!StringUtils.isBlank(strIpPorts)) {
-                String[] ipPorts = strIpPorts.split("\\s+");
-                Collections.addAll(proxys, ipPorts);
-            }
-            AuditOperator.getInstance().setAuditProxy(proxys);
+            AuditOperator.getInstance().setAuditProxy(
+                    CommonConfigHolder.getInstance().getAuditProxys());
             // AuditConfig
-            String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
-            int maxCacheRow = NumberUtils.toInt(
-                    CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
-                    AUDIT_DEFAULT_MAX_CACHE_ROWS);
-            AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
+            AuditConfig auditConfig = new AuditConfig(
+                    CommonConfigHolder.getInstance().getAuditFilePath(),
+                    CommonConfigHolder.getInstance().getAuditMaxCacheRows());
             AuditOperator.getInstance().setAuditConfig(auditConfig);
         }
     }
@@ -80,7 +57,7 @@ public class AuditUtils {
      * Add audit data
      */
     public static void add(int auditID, Event event) {
-        if (!IS_AUDIT || event == null) {
+        if (!CommonConfigHolder.getInstance().isEnableAudit() || event == null) {
             return;
         }
         Map<String, String> headers = event.getHeaders();
@@ -138,7 +115,7 @@ public class AuditUtils {
      * Get AuditFormatTime
      */
     public static long getAuditFormatTime(long msgTime) {
-        return msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        return msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
     }
 
     /**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
index 076f37ecc..58cef4173 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
@@ -47,8 +47,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.inlong.common.metric.MetricValue;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.common.metric.MetricItemValue;
 import org.apache.inlong.common.metric.MetricListener;
@@ -64,8 +63,6 @@ import io.prometheus.client.exporter.HTTPServer;
  */
 public class PrometheusMetricListener extends Collector implements MetricListener {
 
-    public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
-    public static final int DEFAULT_PROMETHEUS_HTTP_PORT = 8080;
     public static final String DEFAULT_DIMENSION_LABEL = "dimension";
     private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricListener.class);
     protected HTTPServer httpServer;
@@ -79,7 +76,7 @@ public class PrometheusMetricListener extends Collector implements MetricListene
      * Constructor
      */
     public PrometheusMetricListener() {
-        this.metricName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+        this.metricName = CommonConfigHolder.getInstance().getClusterName();
         this.metricItem = new DataProxyMetricItem();
         this.metricItem.clusterId = metricName;
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -109,9 +106,8 @@ public class PrometheusMetricListener extends Collector implements MetricListene
         metricValueMap.put(M_NODE_DURATION, metricItem.nodeDuration);
         metricValueMap.put(M_WHOLE_DURATION, metricItem.wholeDuration);
 
-        int httpPort = CommonPropertiesHolder.getInteger(KEY_PROMETHEUS_HTTP_PORT, DEFAULT_PROMETHEUS_HTTP_PORT);
         try {
-            this.httpServer = new HTTPServer(httpPort);
+            this.httpServer = new HTTPServer(CommonConfigHolder.getInstance().getPrometheusHttpPort());
             this.register();
         } catch (IOException e) {
             LOG.error("exception while register prometheus http server:{},error:{}", metricName, e.getMessage());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
index 98a8c21a7..edc8c885a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
@@ -47,8 +47,8 @@ import org.apache.flume.node.StaticZooKeeperConfigurationProvider;
 import org.apache.flume.util.SSLUtil;
 import org.apache.inlong.common.config.IDataProxyConfigHolder;
 import org.apache.inlong.common.metric.MetricObserver;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.heartbeat.HeartbeatManager;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.sdk.commons.admin.AdminTask;
@@ -210,7 +210,7 @@ public class Application {
                 }
             }
             // metrics
-            MetricObserver.init(CommonPropertiesHolder.get());
+            MetricObserver.init(CommonConfigHolder.getInstance().getProperties());
             // audit
             AuditUtils.initAudit();
 
@@ -236,7 +236,7 @@ public class Application {
      * Start by Manager config
      */
     private static void startByManagerConf(CommandLine commandLine) {
-        String proxyName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+        String proxyName = CommonConfigHolder.getInstance().getClusterName();
         ManagerPropsConfigProvider configurationProvider = new ManagerPropsConfigProvider(proxyName);
         Application application = new Application();
         application.handleConfigurationEvent(configurationProvider.getConfiguration());
@@ -267,7 +267,7 @@ public class Application {
                 supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
             }
             // start admin task
-            this.adminTask = new AdminTask(new Context(CommonPropertiesHolder.get()));
+            this.adminTask = new AdminTask(new Context(CommonConfigHolder.getInstance().getProperties()));
             this.adminTask.start();
             HeartbeatManager heartbeatManager = new HeartbeatManager();
             heartbeatManager.start();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index a28493bad..8641d0606 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -21,7 +21,7 @@ import org.apache.commons.lang.ClassUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
@@ -45,10 +45,7 @@ public class SinkContext {
     public static final String KEY_MAX_THREADS = "maxThreads";
     public static final String KEY_PROCESSINTERVAL = "processInterval";
     public static final String KEY_RELOADINTERVAL = "reloadInterval";
-    public static final String KEY_EVENT_HANDLER = "eventHandler";
     public static final String KEY_MESSAGE_QUEUE_HANDLER = "messageQueueHandler";
-    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
-    public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
 
     protected final String clusterId;
     protected final String sinkName;
@@ -70,7 +67,7 @@ public class SinkContext {
         this.sinkName = sinkName;
         this.sinkContext = context;
         this.channel = channel;
-        this.clusterId = CommonPropertiesHolder.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
+        this.clusterId = CommonConfigHolder.getInstance().getClusterName();
         this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
         this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100);
         this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
@@ -199,8 +196,7 @@ public class SinkContext {
      */
     public EventHandler createEventHandler() {
         // IEventHandler
-        String eventHandlerClass = CommonPropertiesHolder.getString(KEY_EVENT_HANDLER,
-                DefaultEventHandler.class.getName());
+        String eventHandlerClass = CommonConfigHolder.getInstance().getEventHandler();
         try {
             Class<?> handlerClass = ClassUtils.getClass(eventHandlerClass);
             Object handlerObject = handlerClass.getDeclaredConstructor().newInstance();
@@ -241,9 +237,6 @@ public class SinkContext {
      * @return
      */
     public static BufferQueue<BatchPackProfile> createBufferQueue() {
-        int maxBufferQueueSizeKb = CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
-                DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
-        BufferQueue<BatchPackProfile> dispatchQueue = new BufferQueue<BatchPackProfile>(maxBufferQueueSizeKb);
-        return dispatchQueue;
+        return new BufferQueue<>(CommonConfigHolder.getInstance().getMaxBufferQueueSizeKb());
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 245971db1..8308af8ec 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -22,8 +22,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -43,7 +43,6 @@ public class MessageQueueZoneSinkContext extends SinkContext {
     public static final String KEY_NODE_ID = "nodeId";
     public static final String PREFIX_PRODUCER = "producer.";
     public static final String KEY_COMPRESS_TYPE = "compressType";
-    public static final String KEY_CACHE_CLUSTER_SELECTOR = "cacheClusterSelector";
 
     private final BufferQueue<BatchPackProfile> dispatchQueue;
 
@@ -63,18 +62,18 @@ public class MessageQueueZoneSinkContext extends SinkContext {
         super(sinkName, context, channel);
         this.dispatchQueue = dispatchQueue;
         // proxyClusterId
-        this.proxyClusterId = CommonPropertiesHolder.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
+        this.proxyClusterId = CommonConfigHolder.getInstance().getClusterName();
         // nodeId
-        this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
+        this.nodeId = CommonConfigHolder.getInstance().getProxyNodeId();
         // compressionType
-        String strCompressionType = CommonPropertiesHolder.getString(KEY_COMPRESS_TYPE,
-                INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name());
+        String strCompressionType = CommonConfigHolder.getInstance().getMsgCompressType();
         this.compressType = INLONG_COMPRESSED_TYPE.valueOf(strCompressionType);
         // producerContext
         Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
         this.producerContext = new Context(producerParams);
         // idTopicHolder
-        Context commonPropertiesContext = new Context(CommonPropertiesHolder.get());
+        Context commonPropertiesContext =
+                new Context(CommonConfigHolder.getInstance().getProperties());
         this.idTopicHolder = new IdTopicConfigHolder();
         this.idTopicHolder.configure(commonPropertiesContext);
         // cacheHolder
@@ -185,7 +184,8 @@ public class MessageQueueZoneSinkContext extends SinkContext {
         final long currentTime = System.currentTimeMillis();
         currentRecord.getEvents().forEach(event -> {
             long msgTime = event.getMsgTime();
-            long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+            long auditFormatTime =
+                    msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
             dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
             DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
             if (result) {
@@ -220,7 +220,8 @@ public class MessageQueueZoneSinkContext extends SinkContext {
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, mqName);
         dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
         long msgTime = currentRecord.getDispatchTime();
-        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        long auditFormatTime =
+                msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
         dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
         long count = currentRecord.getCount();
@@ -244,7 +245,8 @@ public class MessageQueueZoneSinkContext extends SinkContext {
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
         dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "-");
         long msgTime = System.currentTimeMillis();
-        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        long auditFormatTime =
+                msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
         dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
         metricItem.sendFailCount.incrementAndGet();
@@ -279,14 +281,13 @@ public class MessageQueueZoneSinkContext extends SinkContext {
      * createCacheClusterSelector
      */
     public CacheClusterSelector createCacheClusterSelector() {
-        String strSelectorClass = CommonPropertiesHolder.getString(KEY_CACHE_CLUSTER_SELECTOR,
-                AllCacheClusterSelector.class.getName());
+        String strSelectorClass = CommonConfigHolder.getInstance().getCacheClusterSelector();
         try {
             Class<?> selectorClass = ClassUtils.getClass(strSelectorClass);
             Object selectorObject = selectorClass.getDeclaredConstructor().newInstance();
             if (selectorObject instanceof Configurable) {
                 Configurable configurable = (Configurable) selectorObject;
-                configurable.configure(new Context(CommonPropertiesHolder.get()));
+                configurable.configure(new Context(CommonConfigHolder.getInstance().getProperties()));
             }
             if (selectorObject instanceof CacheClusterSelector) {
                 CacheClusterSelector selector = (CacheClusterSelector) selectorObject;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 64612e79e..9eaebb56f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -36,7 +36,7 @@ import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
-import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
@@ -168,11 +168,7 @@ public abstract class BaseSource
         }
         super.start();
         // initial metric item set
-        ConfigManager configManager = ConfigManager.getInstance();
-        String clusterId =
-                configManager.getCommonProperties().getOrDefault(
-                        ConfigConstants.PROXY_CLUSTER_NAME,
-                        ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+        String clusterId = CommonConfigHolder.getInstance().getClusterName();
         this.metricItemSet =
                 new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port));
         MetricRegister.register(metricItemSet);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index e31f5e40d..a846b3df7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -47,6 +47,7 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.msg.MsgType;
 import org.apache.inlong.dataproxy.base.SinkRspEvent;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -395,10 +396,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             }
             // check topic configure
             if (StringUtils.isEmpty(configTopic)) {
-                String acceptMsg =
-                        configManager.getCommonProperties().getOrDefault(
-                                ConfigConstants.SOURCE_NO_TOPIC_ACCEPT, "false");
-                if ("true".equalsIgnoreCase(acceptMsg)) {
+                if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
                     configTopic = this.defaultTopic;
                 } else {
                     commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
index c9f4d178c..be8893997 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
@@ -23,8 +23,7 @@ import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Context;
 import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
@@ -84,8 +83,7 @@ public class SourceContext {
     public SourceContext(AbstractSource source, ChannelGroup allChannels, Context context) {
         this.source = source;
         this.allChannels = allChannels;
-        this.proxyClusterId = CommonPropertiesHolder.get()
-                .getOrDefault(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME, "unknown");
+        this.proxyClusterId = CommonConfigHolder.getInstance().getClusterName();
         this.sourceId = source.getName();
         // metric
         this.metricItemSet = new DataProxyMetricItemSet(sourceId);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
index 4d7864993..80b433c10 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.dataproxy.source.tcp;
 
 import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.SourceContext;
@@ -147,10 +147,10 @@ public class InlongTcpChannelHandler extends ChannelInboundHandlerAdapter {
             this.responsePackage(ctx, ResultCode.SUCCUSS, packObject);
         }
         // process
-        if (!CommonPropertiesHolder.isResponseAfterSave()) {
-            this.processAndResponse(ctx, packObject, events);
-        } else {
+        if (CommonConfigHolder.getInstance().isResponseAfterSave()) {
             this.processAndWaitingSave(ctx, packObject, events);
+        } else {
+            this.processAndResponse(ctx, packObject, events);
         }
     }
 
@@ -174,8 +174,8 @@ public class InlongTcpChannelHandler extends ChannelInboundHandlerAdapter {
             events.forEach(event -> {
                 this.addMetric(true, event.getBody().length, event);
             });
-            boolean awaitResult = callback.getLatch().await(CommonPropertiesHolder.getMaxResponseTimeout(),
-                    TimeUnit.MILLISECONDS);
+            boolean awaitResult = callback.getLatch().await(
+                    CommonConfigHolder.getInstance().getMaxResAfterSaveTimeout(), TimeUnit.MILLISECONDS);
             if (!awaitResult) {
                 if (!callback.getHasResponsed().getAndSet(true)) {
                     this.responsePackage(ctx, ResultCode.ERR_REJECT, packObject);
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
similarity index 58%
rename from inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
rename to inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
index 77773cffc..7102fef1e 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
@@ -15,30 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.dataproxy.config.loader;
+package org.apache.inlong.dataproxy.config.holder;
 
 import org.apache.inlong.common.metric.MetricListener;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Map;
-
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test for {@link ClassResourceCommonPropertiesLoader}
+ * Test for {@link CommonConfigHolder}
  */
-public class TestClassResourceCommonPropertiesLoader {
+public class TestCommonConfigHolder {
 
-    /**
-     * testResult
-     */
     @Test
-    public void testResult() {
-        // increase source
-        ClassResourceCommonPropertiesLoader loader = new ClassResourceCommonPropertiesLoader();
-        Map<String, String> props = loader.load();
-        assertEquals("proxy_inlong5th_sz", props.get(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME));
-        assertEquals("DataProxy", props.get(MetricListener.KEY_METRIC_DOMAINS));
+    public void testCase() {
+        Assert.assertEquals("proxy_inlong5th_sz",
+                CommonConfigHolder.getInstance().getClusterName());
+        Assert.assertTrue(CommonConfigHolder.getInstance().isEnableWhiteList());
+        assertEquals("DataProxy",
+                CommonConfigHolder.getInstance().getProperties().get(MetricListener.KEY_METRIC_DOMAINS));
     }
 }