You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/04/30 13:16:55 UTC
[inlong] branch master updated: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism (#7942)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d0eb13483 [INLONG-7931][DataProxy] Optimize common.properties related control mechanism (#7942)
d0eb13483 is described below
commit d0eb13483eded8106883e1606e74b266cc118df4
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Apr 30 21:16:49 2023 +0800
[INLONG-7931][DataProxy] Optimize common.properties related control mechanism (#7942)
---
.../apache/inlong/dataproxy/config/AuthUtils.java | 10 +-
.../dataproxy/config/CommonConfigHolder.java | 477 +++++++++++++++++++++
.../inlong/dataproxy/config/ConfigManager.java | 42 +-
.../config/DefaultManagerIpListParser.java | 24 +-
.../dataproxy/config/IManagerIpListParser.java | 4 -
.../dataproxy/config/RemoteConfigManager.java | 27 +-
.../config/holder/CommonPropertiesHolder.java | 222 ----------
.../ClassResourceCommonPropertiesLoader.java | 60 ---
.../config/loader/CommonPropertiesLoader.java | 34 --
.../inlong/dataproxy/consts/ConfigConstants.java | 10 -
.../dataproxy/heartbeat/HeartbeatManager.java | 47 +-
.../inlong/dataproxy/http/HttpBaseSource.java | 8 +-
.../dataproxy/metrics/DataProxyMetricItem.java | 4 +-
.../dataproxy/metrics/DataProxyMetricItemSet.java | 5 +-
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 41 +-
.../prometheus/PrometheusMetricListener.java | 10 +-
.../apache/inlong/dataproxy/node/Application.java | 8 +-
.../inlong/dataproxy/sink/common/SinkContext.java | 15 +-
.../sink/mq/MessageQueueZoneSinkContext.java | 27 +-
.../apache/inlong/dataproxy/source/BaseSource.java | 8 +-
.../dataproxy/source/ServerMessageHandler.java | 6 +-
.../inlong/dataproxy/source/SourceContext.java | 6 +-
.../source/tcp/InlongTcpChannelHandler.java | 12 +-
.../TestCommonConfigHolder.java} | 26 +-
24 files changed, 601 insertions(+), 532 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/AuthUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/AuthUtils.java
index 83f4be01b..3aa5661df 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/AuthUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/AuthUtils.java
@@ -18,12 +18,9 @@
package org.apache.inlong.dataproxy.config;
import org.apache.inlong.common.util.BasicAuth;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
public class AuthUtils {
private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
@@ -32,10 +29,9 @@ public class AuthUtils {
* Generate http basic auth credential from configured secretId and secretKey
*/
public static String genBasicAuth() {
- Map<String, String> properties = ConfigManager.getInstance().getCommonProperties();
- String secretId = properties.get(ConfigConstants.MANAGER_AUTH_SECRET_ID);
- String secretKey = properties.get(ConfigConstants.MANAGER_AUTH_SECRET_KEY);
- return BasicAuth.genBasicAuthCredential(secretId, secretKey);
+ return BasicAuth.genBasicAuthCredential(
+ CommonConfigHolder.getInstance().getManagerAuthSecretId(),
+ CommonConfigHolder.getInstance().getManagerAuthSecretKey());
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
new file mode 100644
index 000000000..d6f3aec76
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler;
+import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * common.properties Configure Holder
+ */
+public class CommonConfigHolder {
+
+ public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class);
+ // configure file name
+ private static final String COMMON_CONFIG_FILE_NAME = "common.properties";
+ // **** allowed keys and default value, begin
+ // cluster tag
+ public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
+ public static final String VAL_DEF_CLUSTER_TAG = "default_cluster";
+ // cluster name
+ public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
+ public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy";
+ // cluster incharges
+ public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
+ public static final String VAL_DEF_CLUSTER_INCHARGES = "admin";
+ // cluster exttag,
+ public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag";
+ // predefined format of ext tag: {key}={value}
+ public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true";
+ // manager type
+ public static final String KEY_MANAGER_TYPE = "manager.type";
+ public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName();
+ // manager hosts
+ public static final String KEY_MANAGER_HOSTS = "manager.hosts";
+ public static final String KEY_MANAGER_HOSTS_SEPARATOR = ",";
+ // manager auth secret id
+ public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId";
+ // manager auth secret key
+ public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey";
+ // configure file check interval
+ private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval";
+ public static final long VAL_DEF_CONFIG_CHECK_INTERVAL_MS = 60000L;
+ // Whether to accept messages without mapping between groupId/streamId and topic
+ public static final String KEY_NOTFOUND_TOPIC_ACCEPT = "source.topic.notfound.accept";
+ public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false;
+ // whether enable whitelist, optional field.
+ public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist";
+ public static final boolean VAL_DEF_ENABLE_WHITELIST = false;
+ // Audit fields
+ public static final String KEY_ENABLE_AUDIT = "audit.enable";
+ public static final boolean VAL_DEF_ENABLE_AUDIT = true;
+ public static final String KEY_AUDIT_PROXYS = "audit.proxys";
+ public static final String KEY_AUDIT_FILE_PATH = "audit.filePath";
+ public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/";
+ public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows";
+ public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 2000000;
+ public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = "auditFormatInterval";
+ public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 60000L;
+ // Whether response after save msg
+ public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
+ public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false;
+ // Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
+ public static final String KEY_MAX_RAS_TIMEOUT_MS = "maxRASTimeoutMs";
+ public static final long VAL_DEF_MAX_RAS_TIMEOUT_MS = 10000L;
+ // max buffer queue size in Kb
+ public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
+ public static final int VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
+ // event handler
+ public static final String KEY_EVENT_HANDLER = "eventHandler";
+ public static final String VAL_DEF_EVENT_HANDLER = DefaultEventHandler.class.getName();
+ // cache cluster selector
+ public static final String KEY_CACHE_CLUSTER_SELECTOR = "cacheClusterSelector";
+ public static final String VAL_DEF_CACHE_CLUSTER_SELECTOR = AllCacheClusterSelector.class.getName();
+ // proxy node id
+ public static final String KEY_PROXY_NODE_ID = "nodeId";
+ public static final String VAL_DEF_PROXY_NODE_ID = "127.0.0.1";
+ // msg sent compress type
+ public static final String KEY_MSG_SENT_COMPRESS_TYPE = "compressType";
+ public static final String VAL_DEF_MSG_COMPRESS_TYPE = ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name();
+ // prometheus http port
+ public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
+ public static final int VAL_DEF_PROMETHEUS_HTTP_PORT = 8080;
+ // **** allowed keys and default value, end
+
+ // class instance
+ private static CommonConfigHolder instance = null;
+ private static volatile boolean isInit = false;
+ private Map<String, String> props;
+ // pre-read field values
+ private String clusterTag = VAL_DEF_CLUSTER_TAG;
+ private String clusterName = VAL_DEF_CLUSTER_NAME;
+ private String clusterIncharges = VAL_DEF_CLUSTER_INCHARGES;
+ private String clusterExtTag = VAL_DEF_CLUSTER_EXT_TAG;
+ private String managerType = VAL_DEF_MANAGER_TYPE;
+ private IManagerIpListParser ipListParser = null;
+ private String managerAuthSecretId = "";
+ private String managerAuthSecretKey = "";
+ private long configChkInvlMs = VAL_DEF_CONFIG_CHECK_INTERVAL_MS;
+ private boolean enableAudit = VAL_DEF_ENABLE_AUDIT;
+ private final HashSet<String> auditProxys = new HashSet<>();
+ private String auditFilePath = VAL_DEF_AUDIT_FILE_PATH;
+ private int auditMaxCacheRows = VAL_DEF_AUDIT_MAX_CACHE_ROWS;
+ private long auditFormatInvlMs = VAL_DEF_AUDIT_FORMAT_INTERVAL_MS;
+ private boolean responseAfterSave = VAL_DEF_RESPONSE_AFTER_SAVE;
+ private long maxResAfterSaveTimeout = VAL_DEF_MAX_RAS_TIMEOUT_MS;
+ private boolean noTopicAccept = VAL_DEF_NOTFOUND_TOPIC_ACCEPT;
+ private boolean enableWhiteList = VAL_DEF_ENABLE_WHITELIST;
+ private int maxBufferQueueSizeKb = VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB;
+ private String eventHandler = VAL_DEF_EVENT_HANDLER;
+ private String cacheClusterSelector = VAL_DEF_CACHE_CLUSTER_SELECTOR;
+ private String proxyNodeId = VAL_DEF_PROXY_NODE_ID;
+ private String msgCompressType = VAL_DEF_MSG_COMPRESS_TYPE;
+ private int prometheusHttpPort = VAL_DEF_PROMETHEUS_HTTP_PORT;
+
+ /**
+ * get instance for common.properties config manager
+ */
+ public static CommonConfigHolder getInstance() {
+ if (isInit && instance != null) {
+ return instance;
+ }
+ synchronized (CommonConfigHolder.class) {
+ if (!isInit) {
+ instance = new CommonConfigHolder();
+ if (instance.loadConfigFile()) {
+ instance.preReadFields();
+ }
+ isInit = true;
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * Get the original attribute map
+ *
+ * Notice: only the non-pre-read fields need to be searched from the attribute map,
+ * the pre-read fields MUST be got according to the methods in the class.
+ */
+ public Map<String, String> getProperties() {
+ return this.props;
+ }
+
+ /**
+ * getStringFromContext
+ *
+ * @param context
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public static String getStringFromContext(Context context, String key, String defaultValue) {
+ String value = context.getString(key);
+ value = (value != null) ? value : getInstance().getProperties().getOrDefault(key, defaultValue);
+ return value;
+ }
+
+ public String getClusterTag() {
+ return clusterTag;
+ }
+
+ public String getClusterName() {
+ return this.clusterName;
+ }
+
+ public String getClusterIncharges() {
+ return clusterIncharges;
+ }
+
+ public String getClusterExtTag() {
+ return clusterExtTag;
+ }
+
+ public long getConfigChkInvlMs() {
+ return configChkInvlMs;
+ }
+
+ public boolean isNoTopicAccept() {
+ return noTopicAccept;
+ }
+
+ public boolean isEnableWhiteList() {
+ return this.enableWhiteList;
+ }
+
+ public String getManagerType() {
+ return managerType;
+ }
+
+ public List<String> getManagerHosts() {
+ return this.ipListParser.getIpList();
+ }
+
+ public String getManagerAuthSecretId() {
+ return managerAuthSecretId;
+ }
+
+ public String getManagerAuthSecretKey() {
+ return managerAuthSecretKey;
+ }
+
+ public boolean isEnableAudit() {
+ return enableAudit;
+ }
+
+ public HashSet<String> getAuditProxys() {
+ return auditProxys;
+ }
+
+ public String getAuditFilePath() {
+ return auditFilePath;
+ }
+
+ public int getAuditMaxCacheRows() {
+ return auditMaxCacheRows;
+ }
+
+ public long getAuditFormatInvlMs() {
+ return auditFormatInvlMs;
+ }
+
+ public boolean isResponseAfterSave() {
+ return responseAfterSave;
+ }
+
+ public long getMaxResAfterSaveTimeout() {
+ return maxResAfterSaveTimeout;
+ }
+
+ public int getMaxBufferQueueSizeKb() {
+ return maxBufferQueueSizeKb;
+ }
+
+ public String getEventHandler() {
+ return eventHandler;
+ }
+
+ public String getCacheClusterSelector() {
+ return cacheClusterSelector;
+ }
+
+ public int getPrometheusHttpPort() {
+ return prometheusHttpPort;
+ }
+
+ public String getProxyNodeId() {
+ return proxyNodeId;
+ }
+
+ public String getMsgCompressType() {
+ return msgCompressType;
+ }
+
+ private void preReadFields() {
+ String tmpValue;
+ // read cluster tag
+ tmpValue = this.props.get(KEY_PROXY_CLUSTER_TAG);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.clusterTag = tmpValue.trim();
+ }
+ // read cluster name
+ tmpValue = this.props.get(KEY_PROXY_CLUSTER_NAME);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.clusterName = tmpValue.trim();
+ }
+ // read cluster incharges
+ tmpValue = this.props.get(KEY_PROXY_CLUSTER_INCHARGES);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.clusterIncharges = tmpValue.trim();
+ }
+ tmpValue = this.props.get(KEY_PROXY_CLUSTER_EXT_TAG);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.clusterExtTag = tmpValue.trim();
+ }
+ // read configure check interval
+ tmpValue = this.props.get(KEY_CONFIG_CHECK_INTERVAL_MS);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.configChkInvlMs = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_CONFIG_CHECK_INTERVAL_MS);
+ }
+ // read whether accept msg without topic
+ tmpValue = this.props.get(KEY_NOTFOUND_TOPIC_ACCEPT);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.noTopicAccept = "TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read enable whitelist
+ tmpValue = this.props.get(KEY_ENABLE_WHITELIST);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.enableWhiteList = "TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read manager type
+ tmpValue = this.props.get(KEY_MANAGER_TYPE);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.managerType = tmpValue.trim();
+ }
+ // read manager auth secret id
+ tmpValue = this.props.get(KEY_MANAGER_AUTH_SECRET_ID);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.managerAuthSecretId = tmpValue.trim();
+ }
+ // read manager auth secret key
+ tmpValue = this.props.get(KEY_MANAGER_AUTH_SECRET_KEY);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.managerAuthSecretKey = tmpValue.trim();
+ }
+ // read whether enable audit
+ tmpValue = this.props.get(KEY_ENABLE_AUDIT);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.enableAudit = "TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read audit proxys
+ tmpValue = this.props.get(KEY_AUDIT_PROXYS);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ String[] ipPorts = tmpValue.split("\\s+");
+ for (String tmpIPPort : ipPorts) {
+ if (StringUtils.isBlank(tmpIPPort)) {
+ continue;
+ }
+ this.auditProxys.add(tmpIPPort.trim());
+ }
+ }
+ // read audit file path
+ tmpValue = this.props.get(KEY_AUDIT_FILE_PATH);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.auditFilePath = tmpValue.trim();
+ }
+ // read audit max cache rows
+ tmpValue = this.props.get(KEY_AUDIT_MAX_CACHE_ROWS);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.auditMaxCacheRows = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_AUDIT_MAX_CACHE_ROWS);
+ }
+ // read audit format interval
+ tmpValue = this.props.get(KEY_AUDIT_FORMAT_INTERVAL_MS);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.auditFormatInvlMs = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_AUDIT_FORMAT_INTERVAL_MS);
+ }
+ // read whether response after save
+ tmpValue = this.props.get(KEY_RESPONSE_AFTER_SAVE);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.responseAfterSave = "TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read max response after save timeout
+ tmpValue = this.props.get(KEY_MAX_RAS_TIMEOUT_MS);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.maxResAfterSaveTimeout = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_MAX_RAS_TIMEOUT_MS);
+ }
+ // read max bufferqueue size
+ tmpValue = this.props.get(KEY_MAX_BUFFERQUEUE_SIZE_KB);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.maxBufferQueueSizeKb = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB);
+ }
+ // read event handler
+ tmpValue = this.props.get(KEY_EVENT_HANDLER);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.eventHandler = tmpValue.trim();
+ }
+ // read cache cluster selector
+ tmpValue = this.props.get(KEY_CACHE_CLUSTER_SELECTOR);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.cacheClusterSelector = tmpValue.trim();
+ }
+ // read proxy node id
+ tmpValue = this.props.get(KEY_PROXY_NODE_ID);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.proxyNodeId = tmpValue.trim();
+ }
+ // read msg compress type
+ tmpValue = this.props.get(KEY_MSG_SENT_COMPRESS_TYPE);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.msgCompressType = tmpValue.trim();
+ }
+ // read prometheus Http Port
+ tmpValue = this.props.get(KEY_PROMETHEUS_HTTP_PORT);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.prometheusHttpPort = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_PROMETHEUS_HTTP_PORT);
+ }
+ // initial ip parser
+ try {
+ Class<? extends IManagerIpListParser> ipListParserClass =
+ (Class<? extends IManagerIpListParser>) Class.forName(this.managerType);
+ this.ipListParser = ipListParserClass.getDeclaredConstructor().newInstance();
+ this.ipListParser.setCommonProperties(this.props);
+ } catch (Throwable t) {
+ LOG.error("Initial ipListParser Class {} failure, exit!", this.managerType, t);
+ System.exit(6);
+ }
+ }
+
+ private void chkRequiredFields(String requiredFieldKey) {
+ String fieldVal = props.get(requiredFieldKey);
+ if (fieldVal == null) {
+ LOG.error("Missing mandatory field {} in {}, exit!",
+ requiredFieldKey, COMMON_CONFIG_FILE_NAME);
+ System.exit(4);
+ }
+ if (StringUtils.isBlank(fieldVal)) {
+ LOG.error("Required {} field value is blank in {}, exit!",
+ requiredFieldKey, COMMON_CONFIG_FILE_NAME);
+ System.exit(5);
+ }
+ }
+
+ private boolean loadConfigFile() {
+ InputStream inStream = null;
+ try {
+ URL url = getClass().getClassLoader().getResource(COMMON_CONFIG_FILE_NAME);
+ inStream = url != null ? url.openStream() : null;
+ if (inStream == null) {
+ LOG.error("Fail to open {} as the input stream is null, exit!",
+ COMMON_CONFIG_FILE_NAME);
+ System.exit(1);
+ return false;
+ }
+ String strKey;
+ String strVal;
+ Properties tmpProps = new Properties();
+ tmpProps.load(inStream);
+ props = new HashMap<>(tmpProps.size());
+ for (Map.Entry<Object, Object> entry : tmpProps.entrySet()) {
+ if (entry == null || entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
+ strKey = (String) entry.getKey();
+ strVal = (String) entry.getValue();
+ if (StringUtils.isBlank(strKey) || StringUtils.isBlank(strVal)) {
+ continue;
+ }
+ props.put(strKey.trim(), strVal.trim());
+ }
+ LOG.info("Read success from {}, content is {}", COMMON_CONFIG_FILE_NAME, props);
+ } catch (Throwable e) {
+ LOG.error("Fail to load properties from {}, exit!",
+ COMMON_CONFIG_FILE_NAME, e);
+ System.exit(2);
+ return false;
+ } finally {
+ if (null != inStream) {
+ try {
+ inStream.close();
+ } catch (IOException e) {
+ LOG.error("Fail to InputStream.close() for file {}, exit!",
+ COMMON_CONFIG_FILE_NAME, e);
+ System.exit(3);
+ }
+ }
+ }
+ return true;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 99390d659..c2fac48ce 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -54,8 +54,6 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_INTERVAL;
-
/**
* Config manager class.
*/
@@ -66,9 +64,7 @@ public class ConfigManager {
public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<>();
private static volatile boolean isInit = false;
private static ConfigManager instance = null;
- private static volatile boolean enableWhitList = false;
- private final PropertiesConfigHolder commonConfig = new PropertiesConfigHolder("common.properties");
private final MQClusterConfigHolder mqClusterConfigHolder = new MQClusterConfigHolder("mq_cluster.properties");
private final PropertiesConfigHolder topicConfig = new PropertiesConfigHolder("topics.properties");
private final MxPropertiesHolder mxConfig = new MxPropertiesHolder("mx.properties");
@@ -95,11 +91,6 @@ public class ConfigManager {
for (ConfigHolder holder : CONFIG_HOLDER_LIST) {
holder.loadFromFileToHolder();
}
- // get enable whitelist status
- String strEnable =
- instance.getCommonProperties().get(ConfigConstants.ENABLE_WHITELIST);
- enableWhitList = StringUtils.isNotBlank(strEnable)
- && "TRUE".equalsIgnoreCase(strEnable);
ReloadConfigWorker reloadProperties = ReloadConfigWorker.create(instance);
reloadProperties.setDaemon(true);
reloadProperties.start();
@@ -135,13 +126,14 @@ public class ConfigManager {
}
public boolean needChkIllegalIP() {
- return (!blackListConfig.isEmptyConfig() || enableWhitList);
+ return (!blackListConfig.isEmptyConfig()
+ || CommonConfigHolder.getInstance().isEnableWhiteList());
}
public boolean isIllegalIP(String strRemoteIP) {
return strRemoteIP == null
|| blackListConfig.isContain(strRemoteIP)
- || (enableWhitList && !whiteListConfig.isContain(strRemoteIP));
+ || (CommonConfigHolder.getInstance().isEnableWhiteList() && !whiteListConfig.isContain(strRemoteIP));
}
public boolean addMxProperties(Map<String, String> result) {
@@ -266,10 +258,6 @@ public class ConfigManager {
return groupIdConfig.getGroupIdEnableMappingProperties();
}
- public Map<String, String> getCommonProperties() {
- return commonConfig.getHolder();
- }
-
public PropertiesConfigHolder getTopicConfig() {
return topicConfig;
}
@@ -320,16 +308,7 @@ public class ConfigManager {
}
private long getSleepTime() {
- String sleepTimeInMsStr = configManager.getCommonProperties().get(CONFIG_CHECK_INTERVAL);
- long sleepTimeInMs = 10000;
- try {
- if (sleepTimeInMsStr != null) {
- sleepTimeInMs = Long.parseLong(sleepTimeInMsStr);
- }
- } catch (Exception e) {
- LOG.info("ignored exception ", e);
- }
- return sleepTimeInMs + getRandom(0, 5000);
+ return CommonConfigHolder.getInstance().getConfigChkInvlMs() + getRandom(0, 5000);
}
public void close() {
@@ -431,15 +410,14 @@ public class ConfigManager {
private void checkRemoteConfig() {
try {
- String managerHosts = configManager.getCommonProperties().get(ConfigConstants.MANAGER_HOST);
- String proxyClusterName = configManager.getCommonProperties().get(ConfigConstants.PROXY_CLUSTER_NAME);
- String proxyClusterTag = configManager.getCommonProperties().get(ConfigConstants.PROXY_CLUSTER_TAG);
- if (StringUtils.isEmpty(managerHosts) || StringUtils.isEmpty(proxyClusterName)) {
+ List<String> mgrHostPorts = CommonConfigHolder.getInstance().getManagerHosts();
+ String proxyClusterName = CommonConfigHolder.getInstance().getClusterName();
+ String proxyClusterTag = CommonConfigHolder.getInstance().getClusterTag();
+ if (mgrHostPorts.isEmpty() || StringUtils.isEmpty(proxyClusterName)) {
return;
}
- String[] hostList = StringUtils.split(managerHosts, ",");
- for (String host : hostList) {
- if (checkWithManager(host, proxyClusterName, proxyClusterTag)) {
+ for (String hostPort : mgrHostPorts) {
+ if (checkWithManager(hostPort, proxyClusterName, proxyClusterTag)) {
break;
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/DefaultManagerIpListParser.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/DefaultManagerIpListParser.java
index 684e11f58..e6ccb72aa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/DefaultManagerIpListParser.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/DefaultManagerIpListParser.java
@@ -17,10 +17,9 @@
package org.apache.inlong.dataproxy.config;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.lang3.StringUtils;
/**
@@ -29,7 +28,7 @@ import org.apache.commons.lang3.StringUtils;
*/
public class DefaultManagerIpListParser implements IManagerIpListParser {
- private Map<String, String> commonProperties;
+ List<String> managerIpList = new ArrayList<>();
/**
* setCommonProperties
@@ -38,19 +37,28 @@ public class DefaultManagerIpListParser implements IManagerIpListParser {
*/
@Override
public void setCommonProperties(Map<String, String> commonProperties) {
- this.commonProperties = commonProperties;
+ String managerHosts = commonProperties.get(CommonConfigHolder.KEY_MANAGER_HOSTS);
+ if (StringUtils.isBlank(managerHosts)) {
+ return;
+ }
+ String[] hostPortList = StringUtils.split(managerHosts,
+ CommonConfigHolder.KEY_MANAGER_HOSTS_SEPARATOR);
+ for (String hostport : hostPortList) {
+ if (StringUtils.isBlank(hostport)) {
+ continue;
+ }
+ managerIpList.add(hostport.trim());
+ }
}
/**
* getIpList
*
- * @return
+ * @return manager ip-port list
*/
@Override
public List<String> getIpList() {
- String managerHosts = this.commonProperties.get(KEY_MANAGER_HOSTS);
- String[] hostList = StringUtils.split(managerHosts, SEPARATOR);
- return Arrays.asList(hostList);
+ return managerIpList;
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/IManagerIpListParser.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/IManagerIpListParser.java
index ad8371d1c..8a5c88290 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/IManagerIpListParser.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/IManagerIpListParser.java
@@ -25,10 +25,6 @@ import java.util.Map;
*/
public interface IManagerIpListParser {
- String KEY_MANAGER_HOSTS = "manager.hosts";
- String SEPARATOR = ",";
- String KEY_MANAGER_TYPE = "manager.type";
-
void setCommonProperties(Map<String, String> commonProperties);
/**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index 757f278fa..1013250c3 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -19,7 +19,6 @@ package org.apache.inlong.dataproxy.config;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -40,7 +39,6 @@ import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
import org.apache.inlong.common.pojo.dataproxy.ProxySink;
import org.apache.inlong.common.pojo.dataproxy.ProxySource;
import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.utils.HttpUtils;
import org.slf4j.Logger;
@@ -63,10 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RemoteConfigManager implements IRepository {
- public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
- private static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
private static final char FLUME_SEPARATOR = '.';
- private static final String KEY_CONFIG_CHECK_INTERVAL = "configCheckInterval";
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteConfigManager.class);
private static final Gson GSON = new Gson();
@@ -80,7 +75,6 @@ public class RemoteConfigManager implements IRepository {
private long reloadInterval;
private Timer reloadTimer;
- private IManagerIpListParser ipListParser;
private CloseableHttpClient httpClient;
// flume properties
@@ -106,15 +100,7 @@ public class RemoteConfigManager implements IRepository {
if (!isInit) {
instance = new RemoteConfigManager();
try {
- String strReloadInterval = CommonPropertiesHolder.getString(KEY_CONFIG_CHECK_INTERVAL);
- instance.reloadInterval = NumberUtils.toLong(strReloadInterval, DEFAULT_HEARTBEAT_INTERVAL_MS);
-
- String ipListParserType = CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE,
- DefaultManagerIpListParser.class.getName());
- Class<? extends IManagerIpListParser> ipListParserClass;
- ipListParserClass = (Class<? extends IManagerIpListParser>) Class
- .forName(ipListParserType);
- instance.ipListParser = ipListParserClass.getDeclaredConstructor().newInstance();
+ instance.reloadInterval = CommonConfigHolder.getInstance().getConfigChkInvlMs();
SecureRandom random = new SecureRandom(String.valueOf(System.currentTimeMillis()).getBytes());
instance.managerIpListIndex.set(random.nextInt());
@@ -150,14 +136,13 @@ public class RemoteConfigManager implements IRepository {
*/
public void reload() {
LOGGER.info("start to reload config");
- String proxyClusterName = CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
- String proxyClusterTag = CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_TAG);
+ String proxyClusterName = CommonConfigHolder.getInstance().getClusterName();
+ String proxyClusterTag = CommonConfigHolder.getInstance().getClusterTag();
if (StringUtils.isBlank(proxyClusterName) || StringUtils.isBlank(proxyClusterTag)) {
return;
}
- this.ipListParser.setCommonProperties(CommonPropertiesHolder.get());
- List<String> managerIpList = this.ipListParser.getIpList();
+ List<String> managerIpList = CommonConfigHolder.getInstance().getManagerHosts();
if (managerIpList == null || managerIpList.size() == 0) {
return;
}
@@ -263,7 +248,7 @@ public class RemoteConfigManager implements IRepository {
if (currentClusterConfig != null) {
return currentClusterConfig.getProxyCluster().getName();
}
- return CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
+ return CommonConfigHolder.getInstance().getClusterName();
}
/**
@@ -274,7 +259,7 @@ public class RemoteConfigManager implements IRepository {
if (currentClusterConfig != null) {
return currentClusterConfig.getProxyCluster().getSetName();
}
- return CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_TAG);
+ return CommonConfigHolder.getInstance().getClusterTag();
}
/**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
deleted file mode 100644
index 1d4fe3990..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.holder;
-
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.loader.ClassResourceCommonPropertiesLoader;
-import org.apache.inlong.dataproxy.config.loader.CommonPropertiesLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * CommonPropertiesHolder
- */
-public class CommonPropertiesHolder {
-
- public static final Logger LOG = LoggerFactory.getLogger(CommonPropertiesHolder.class);
- public static final String KEY_COMMON_PROPERTIES = "common-properties-loader";
- public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
- public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
- public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
- public static final boolean DEFAULT_RESPONSE_AFTER_SAVE = false;
- public static final String KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
- public static final long DEFAULT_MAX_RESPONSE_TIMEOUT_MS = 10000L;
-
- private static Map<String, String> props;
-
- private static long auditFormatInterval = 60000L;
- private static boolean isResponseAfterSave = DEFAULT_RESPONSE_AFTER_SAVE;
- private static long maxResponseTimeout = DEFAULT_MAX_RESPONSE_TIMEOUT_MS;
-
- /**
- * init
- */
- private static void init() {
- synchronized (KEY_COMMON_PROPERTIES) {
- if (props == null) {
- props = new ConcurrentHashMap<>();
- String loaderClassName = System.getenv(KEY_COMMON_PROPERTIES);
- loaderClassName = (loaderClassName == null) ? DEFAULT_LOADER : loaderClassName;
- try {
- Class<?> loaderClass = ClassUtils.getClass(loaderClassName);
- Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
- if (loaderObject instanceof CommonPropertiesLoader) {
- CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
- props.putAll(loader.load());
- LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
- auditFormatInterval = NumberUtils
- .toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
- isResponseAfterSave = BooleanUtils
- .toBoolean(CommonPropertiesHolder.getString(KEY_RESPONSE_AFTER_SAVE));
- maxResponseTimeout = CommonPropertiesHolder.getLong(KEY_MAX_RESPONSE_TIMEOUT_MS,
- DEFAULT_MAX_RESPONSE_TIMEOUT_MS);
- }
- } catch (Throwable t) {
- LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
- loaderClassName, t.getMessage());
- LOG.error(t.getMessage(), t);
- }
-
- }
- }
- }
-
- /**
- * get props
- *
- * @return the props
- */
- public static Map<String, String> get() {
- synchronized (KEY_COMMON_PROPERTIES) {
- if (props != null) {
- return props;
- }
- }
- init();
- return props;
- }
-
- /**
- * Gets value mapped to key, returning defaultValue if unmapped.
- *
- * @param key to be found
- * @param defaultValue returned if key is unmapped
- * @return value associated with key
- */
- public static String getString(String key, String defaultValue) {
- return get().getOrDefault(key, defaultValue);
- }
-
- /**
- * Gets value mapped to key, returning null if unmapped.
- *
- * @param key to be found
- * @return value associated with key or null if unmapped
- */
- public static String getString(String key) {
- return get().get(key);
- }
-
- /**
- * getStringFromContext
- *
- * @param context
- * @param key
- * @param defaultValue
- * @return
- */
- public static String getStringFromContext(Context context, String key, String defaultValue) {
- String value = context.getString(key);
- value = (value != null) ? value : props.getOrDefault(key, defaultValue);
- return value;
- }
-
- /**
- * Gets value mapped to key, returning defaultValue if unmapped.
- *
- * @param key to be found
- * @param defaultValue returned if key is unmapped
- * @return value associated with key
- */
- public static Integer getInteger(String key, Integer defaultValue) {
- String value = get().get(key);
- if (value != null) {
- return Integer.valueOf(Integer.parseInt(value.trim()));
- }
- return defaultValue;
- }
-
- /**
- * Gets value mapped to key, returning null if unmapped.
- * <p>
- * Note that this method returns an object as opposed to a primitive. The configuration key requested may not be
- * mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
- * return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
- * </p>
- *
- * @param key to be found
- * @return value associated with key or null if unmapped
- */
- public static Integer getInteger(String key) {
- return getInteger(key, null);
- }
-
- /**
- * Gets value mapped to key, returning defaultValue if unmapped.
- *
- * @param key to be found
- * @param defaultValue returned if key is unmapped
- * @return value associated with key
- */
- public static Long getLong(String key, Long defaultValue) {
- String value = get().get(key);
- if (value != null) {
- return Long.valueOf(Long.parseLong(value.trim()));
- }
- return defaultValue;
- }
-
- /**
- * Gets value mapped to key, returning null if unmapped.
- * <p>
- * Note that this method returns an object as opposed to a primitive. The configuration key requested may not be
- * mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
- * return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
- * </p>
- *
- * @param key to be found
- * @return value associated with key or null if unmapped
- */
- public static Long getLong(String key) {
- return getLong(key, null);
- }
-
- /**
- * getAuditFormatInterval
- *
- * @return
- */
- public static long getAuditFormatInterval() {
- return auditFormatInterval;
- }
-
- /**
- * isResponseAfterSave
- *
- * @return
- */
- public static boolean isResponseAfterSave() {
- return isResponseAfterSave;
- }
-
- /**
- * get maxResponseTimeout
- *
- * @return the maxResponseTimeout
- */
- public static long getMaxResponseTimeout() {
- return maxResponseTimeout;
- }
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
deleted file mode 100644
index 2e2c7c63a..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.loader;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Class resource common properties loader
- */
-public class ClassResourceCommonPropertiesLoader implements CommonPropertiesLoader {
-
- private static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
- private static final String FILE_NAME = "common.properties";
-
- /**
- * load properties
- */
- @Override
- public Map<String, String> load() {
- return this.loadProperties();
- }
-
- protected Map<String, String> loadProperties() {
- Map<String, String> result = new ConcurrentHashMap<>();
- URL resource = getClass().getClassLoader().getResource(FILE_NAME);
- try (InputStream inStream = Objects.requireNonNull(resource).openStream()) {
- Properties props = new Properties();
- props.load(inStream);
- for (Map.Entry<Object, Object> entry : props.entrySet()) {
- result.put((String) entry.getKey(), (String) entry.getValue());
- }
- } catch (Exception e) {
- LOG.error("fail to load properties from file ={}", FILE_NAME, e);
- }
- return result;
- }
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
deleted file mode 100644
index 92d8f7061..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.config.loader;
-
-import java.util.Map;
-
-/**
- * Interface of common properties loader
- */
-public interface CommonPropertiesLoader {
-
- /**
- * load
- *
- * @return the configuration map
- */
- Map<String, String> load();
-
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 33793a4ff..0dd453e3e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -95,16 +95,6 @@ public class ConfigConstants {
public static final String L5_ID_KEY = "l5id";
public static final String SET_KEY = "set";
public static final String CLUSTER_ID_KEY = "clusterId";
- public static final String MANAGER_HOST = "manager.hosts";
- public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name";
- public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy";
- public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
- public static final String PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag";
- public static final String PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
- public static final String CONFIG_CHECK_INTERVAL = "configCheckInterval";
- public static final String SOURCE_NO_TOPIC_ACCEPT = "source.topic.notfound.accept";
- public static final String SINK_NO_TOPIC_RESEND = "sink.topic.notfound.resend";
- public static final String ENABLE_WHITELIST = "proxy.enable.whitelist";
public static final String DECODER_BODY = "body";
public static final String DECODER_TOPICKEY = "topic_key";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index 72f753680..f45048bf1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -36,6 +36,7 @@ import org.apache.inlong.common.heartbeat.GroupHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.common.heartbeat.StreamHeartbeat;
import org.apache.inlong.dataproxy.config.AuthUtils;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.SourceReportInfo;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -54,12 +55,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
@Slf4j
public class HeartbeatManager implements AbstractHeartbeatManager {
- public static final String DEFAULT_CLUSTER_TAG = "default_cluster";
- public static final String DEFAULT_CLUSTER_NAME = "default_dataproxy";
- public static final String DEFAULT_CLUSTER_INCHARGES = "admin";
- // predefined format of ext tag: {key}={value}
- public static final String DEFAULT_CLUSTER_EXT_TAG = "default=true";
-
private final CloseableHttpClient httpClient;
private final Gson gson;
@@ -88,10 +83,20 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
if (null == heartbeat) {
return;
}
- ConfigManager configManager = ConfigManager.getInstance();
- final String managerHost = configManager.getCommonProperties().get(ConfigConstants.MANAGER_HOST);
+ List<String> mgrHostPorts = CommonConfigHolder.getInstance().getManagerHosts();
+ if (mgrHostPorts.isEmpty()) {
+ return;
+ }
+ for (String managerHost : mgrHostPorts) {
+ if (sendHeartBeatMsg(managerHost, heartbeat)) {
+ return;
+ }
+ }
+ }
+
+ private boolean sendHeartBeatMsg(String mgrHostPort, HeartbeatMsg heartbeat) {
final String url =
- "http://" + managerHost + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_HEARTBEAT_REPORT;
+ "http://" + mgrHostPort + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_HEARTBEAT_REPORT;
try {
HttpPost post = new HttpPost(url);
post.addHeader(HttpHeaders.CONNECTION, "close");
@@ -102,15 +107,18 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
post.setEntity(stringEntity);
CloseableHttpResponse response = httpClient.execute(post);
String isSuccess = EntityUtils.toString(response.getEntity());
- if (StringUtils.isNotEmpty(isSuccess)
- && response.getStatusLine().getStatusCode() == 200) {
- if (log.isDebugEnabled()) {
- log.debug("reportHeartbeat url {}, heartbeat: {}, return str {}", url, body, isSuccess);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ if (StringUtils.isNotEmpty(isSuccess)) {
+ if (log.isDebugEnabled()) {
+ log.debug("reportHeartbeat url {}, heartbeat: {}, return str {}", url, body, isSuccess);
+ }
}
+ return true;
}
} catch (Exception ex) {
log.error("reportHeartbeat failed for url {}", url, ex);
}
+ return false;
}
private synchronized CloseableHttpClient constructHttpClient() {
@@ -139,15 +147,10 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getType());
heartbeatMsg.setReportTime(System.currentTimeMillis());
heartbeatMsg.setLoad(0xffff);
- Map<String, String> commonProperties = configManager.getCommonProperties();
- heartbeatMsg.setClusterTag(commonProperties.getOrDefault(
- ConfigConstants.PROXY_CLUSTER_TAG, DEFAULT_CLUSTER_TAG));
- heartbeatMsg.setClusterName(commonProperties.getOrDefault(
- ConfigConstants.PROXY_CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
- heartbeatMsg.setInCharges(commonProperties.getOrDefault(
- ConfigConstants.PROXY_CLUSTER_INCHARGES, DEFAULT_CLUSTER_INCHARGES));
- heartbeatMsg.setExtTag(commonProperties.getOrDefault(
- ConfigConstants.PROXY_CLUSTER_EXT_TAG, DEFAULT_CLUSTER_EXT_TAG));
+ heartbeatMsg.setClusterTag(CommonConfigHolder.getInstance().getClusterTag());
+ heartbeatMsg.setClusterName(CommonConfigHolder.getInstance().getClusterName());
+ heartbeatMsg.setInCharges(CommonConfigHolder.getInstance().getClusterIncharges());
+ heartbeatMsg.setExtTag(CommonConfigHolder.getInstance().getClusterExtTag());
Map<String, String> groupIdMappings = configManager.getGroupIdMappingProperties();
Map<String, Map<String, String>> streamIdMappings = configManager.getStreamIdMappingProperties();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
index 7faa75d64..70267cd59 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
@@ -28,7 +28,7 @@ import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
-import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
@@ -77,11 +77,7 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource,
statIntervalSec, maxMonitorCnt);
}
// register metrics
- ConfigManager configManager = ConfigManager.getInstance();
- String clusterId =
- configManager.getCommonProperties().getOrDefault(
- ConfigConstants.PROXY_CLUSTER_NAME,
- ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+ String clusterId = CommonConfigHolder.getInstance().getClusterName();
this.metricItemSet =
new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port));
MetricRegister.register(metricItemSet);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
index 7187ad760..644aef6ad 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
@@ -26,7 +26,7 @@ import org.apache.inlong.common.metric.Dimension;
import org.apache.inlong.common.metric.MetricDomain;
import org.apache.inlong.common.metric.MetricItem;
import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.Constants;
@@ -144,7 +144,7 @@ public class DataProxyMetricItem extends MetricItem {
*/
public static void fillAuditFormatTime(Event event, Map<String, String> dimensions) {
long msgTime = (event != null) ? AuditUtils.getLogTime(event) : System.currentTimeMillis();
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ long auditFormatTime = msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index 9225760f0..6a3152a99 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -24,7 +24,7 @@ import org.apache.flume.Event;
import org.apache.inlong.common.metric.MetricDomain;
import org.apache.inlong.common.metric.MetricItemSet;
import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
/**
@@ -116,7 +116,8 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
event.getHeaders().get(AttributeConstants.DATA_TIME));
long msgCount = NumberUtils.toLong(
event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
- long auditFormatTime = dataTime - dataTime % CommonPropertiesHolder.getAuditFormatInterval();
+ long auditFormatTime =
+ dataTime - dataTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
if (isSource) {
dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, name);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 664c0340d..703a466d7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -17,21 +17,16 @@
package org.apache.inlong.dataproxy.metrics.audit;
-import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.dataproxy.utils.InLongMsgVer;
-
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Map;
/**
@@ -39,39 +34,21 @@ import java.util.Map;
*/
public class AuditUtils {
- public static final String AUDIT_KEY_FILE_PATH = "audit.filePath";
- public static final String AUDIT_DEFAULT_FILE_PATH = "/data/inlong/audit/";
- public static final String AUDIT_KEY_MAX_CACHE_ROWS = "audit.maxCacheRows";
- public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
- public static final String AUDIT_KEY_PROXYS = "audit.proxys";
- public static final String AUDIT_KEY_IS_AUDIT = "audit.enable";
-
public static final int AUDIT_ID_DATAPROXY_READ_SUCCESS = 5;
public static final int AUDIT_ID_DATAPROXY_SEND_SUCCESS = 6;
- private static boolean IS_AUDIT = true;
-
/**
* Init audit
*/
public static void initAudit() {
- // IS_AUDIT
- IS_AUDIT = BooleanUtils.toBoolean(CommonPropertiesHolder.getString(AUDIT_KEY_IS_AUDIT));
- if (IS_AUDIT) {
+ if (CommonConfigHolder.getInstance().isEnableAudit()) {
// AuditProxy
- String strIpPorts = CommonPropertiesHolder.getString(AUDIT_KEY_PROXYS);
- HashSet<String> proxys = new HashSet<>();
- if (!StringUtils.isBlank(strIpPorts)) {
- String[] ipPorts = strIpPorts.split("\\s+");
- Collections.addAll(proxys, ipPorts);
- }
- AuditOperator.getInstance().setAuditProxy(proxys);
+ AuditOperator.getInstance().setAuditProxy(
+ CommonConfigHolder.getInstance().getAuditProxys());
// AuditConfig
- String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
- int maxCacheRow = NumberUtils.toInt(
- CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
- AUDIT_DEFAULT_MAX_CACHE_ROWS);
- AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
+ AuditConfig auditConfig = new AuditConfig(
+ CommonConfigHolder.getInstance().getAuditFilePath(),
+ CommonConfigHolder.getInstance().getAuditMaxCacheRows());
AuditOperator.getInstance().setAuditConfig(auditConfig);
}
}
@@ -80,7 +57,7 @@ public class AuditUtils {
* Add audit data
*/
public static void add(int auditID, Event event) {
- if (!IS_AUDIT || event == null) {
+ if (!CommonConfigHolder.getInstance().isEnableAudit() || event == null) {
return;
}
Map<String, String> headers = event.getHeaders();
@@ -138,7 +115,7 @@ public class AuditUtils {
* Get AuditFormatTime
*/
public static long getAuditFormatTime(long msgTime) {
- return msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ return msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
}
/**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
index 076f37ecc..58cef4173 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
@@ -47,8 +47,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.inlong.common.metric.MetricValue;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.common.metric.MetricItemValue;
import org.apache.inlong.common.metric.MetricListener;
@@ -64,8 +63,6 @@ import io.prometheus.client.exporter.HTTPServer;
*/
public class PrometheusMetricListener extends Collector implements MetricListener {
- public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
- public static final int DEFAULT_PROMETHEUS_HTTP_PORT = 8080;
public static final String DEFAULT_DIMENSION_LABEL = "dimension";
private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricListener.class);
protected HTTPServer httpServer;
@@ -79,7 +76,7 @@ public class PrometheusMetricListener extends Collector implements MetricListene
* Constructor
*/
public PrometheusMetricListener() {
- this.metricName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+ this.metricName = CommonConfigHolder.getInstance().getClusterName();
this.metricItem = new DataProxyMetricItem();
this.metricItem.clusterId = metricName;
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -109,9 +106,8 @@ public class PrometheusMetricListener extends Collector implements MetricListene
metricValueMap.put(M_NODE_DURATION, metricItem.nodeDuration);
metricValueMap.put(M_WHOLE_DURATION, metricItem.wholeDuration);
- int httpPort = CommonPropertiesHolder.getInteger(KEY_PROMETHEUS_HTTP_PORT, DEFAULT_PROMETHEUS_HTTP_PORT);
try {
- this.httpServer = new HTTPServer(httpPort);
+ this.httpServer = new HTTPServer(CommonConfigHolder.getInstance().getPrometheusHttpPort());
this.register();
} catch (IOException e) {
LOG.error("exception while register prometheus http server:{},error:{}", metricName, e.getMessage());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
index 98a8c21a7..edc8c885a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
@@ -47,8 +47,8 @@ import org.apache.flume.node.StaticZooKeeperConfigurationProvider;
import org.apache.flume.util.SSLUtil;
import org.apache.inlong.common.config.IDataProxyConfigHolder;
import org.apache.inlong.common.metric.MetricObserver;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.heartbeat.HeartbeatManager;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.sdk.commons.admin.AdminTask;
@@ -210,7 +210,7 @@ public class Application {
}
}
// metrics
- MetricObserver.init(CommonPropertiesHolder.get());
+ MetricObserver.init(CommonConfigHolder.getInstance().getProperties());
// audit
AuditUtils.initAudit();
@@ -236,7 +236,7 @@ public class Application {
* Start by Manager config
*/
private static void startByManagerConf(CommandLine commandLine) {
- String proxyName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+ String proxyName = CommonConfigHolder.getInstance().getClusterName();
ManagerPropsConfigProvider configurationProvider = new ManagerPropsConfigProvider(proxyName);
Application application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
@@ -267,7 +267,7 @@ public class Application {
supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
// start admin task
- this.adminTask = new AdminTask(new Context(CommonPropertiesHolder.get()));
+ this.adminTask = new AdminTask(new Context(CommonConfigHolder.getInstance().getProperties()));
this.adminTask.start();
HeartbeatManager heartbeatManager = new HeartbeatManager();
heartbeatManager.start();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index a28493bad..8641d0606 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -21,7 +21,7 @@ import org.apache.commons.lang.ClassUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
@@ -45,10 +45,7 @@ public class SinkContext {
public static final String KEY_MAX_THREADS = "maxThreads";
public static final String KEY_PROCESSINTERVAL = "processInterval";
public static final String KEY_RELOADINTERVAL = "reloadInterval";
- public static final String KEY_EVENT_HANDLER = "eventHandler";
public static final String KEY_MESSAGE_QUEUE_HANDLER = "messageQueueHandler";
- public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
- public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
protected final String clusterId;
protected final String sinkName;
@@ -70,7 +67,7 @@ public class SinkContext {
this.sinkName = sinkName;
this.sinkContext = context;
this.channel = channel;
- this.clusterId = CommonPropertiesHolder.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
+ this.clusterId = CommonConfigHolder.getInstance().getClusterName();
this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100);
this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
@@ -199,8 +196,7 @@ public class SinkContext {
*/
public EventHandler createEventHandler() {
// IEventHandler
- String eventHandlerClass = CommonPropertiesHolder.getString(KEY_EVENT_HANDLER,
- DefaultEventHandler.class.getName());
+ String eventHandlerClass = CommonConfigHolder.getInstance().getEventHandler();
try {
Class<?> handlerClass = ClassUtils.getClass(eventHandlerClass);
Object handlerObject = handlerClass.getDeclaredConstructor().newInstance();
@@ -241,9 +237,6 @@ public class SinkContext {
* @return
*/
public static BufferQueue<BatchPackProfile> createBufferQueue() {
- int maxBufferQueueSizeKb = CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
- DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
- BufferQueue<BatchPackProfile> dispatchQueue = new BufferQueue<BatchPackProfile>(maxBufferQueueSizeKb);
- return dispatchQueue;
+ return new BufferQueue<>(CommonConfigHolder.getInstance().getMaxBufferQueueSizeKb());
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 245971db1..8308af8ec 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -22,8 +22,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -43,7 +43,6 @@ public class MessageQueueZoneSinkContext extends SinkContext {
public static final String KEY_NODE_ID = "nodeId";
public static final String PREFIX_PRODUCER = "producer.";
public static final String KEY_COMPRESS_TYPE = "compressType";
- public static final String KEY_CACHE_CLUSTER_SELECTOR = "cacheClusterSelector";
private final BufferQueue<BatchPackProfile> dispatchQueue;
@@ -63,18 +62,18 @@ public class MessageQueueZoneSinkContext extends SinkContext {
super(sinkName, context, channel);
this.dispatchQueue = dispatchQueue;
// proxyClusterId
- this.proxyClusterId = CommonPropertiesHolder.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
+ this.proxyClusterId = CommonConfigHolder.getInstance().getClusterName();
// nodeId
- this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
+ this.nodeId = CommonConfigHolder.getInstance().getProxyNodeId();
// compressionType
- String strCompressionType = CommonPropertiesHolder.getString(KEY_COMPRESS_TYPE,
- INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name());
+ String strCompressionType = CommonConfigHolder.getInstance().getMsgCompressType();
this.compressType = INLONG_COMPRESSED_TYPE.valueOf(strCompressionType);
// producerContext
Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
this.producerContext = new Context(producerParams);
// idTopicHolder
- Context commonPropertiesContext = new Context(CommonPropertiesHolder.get());
+ Context commonPropertiesContext =
+ new Context(CommonConfigHolder.getInstance().getProperties());
this.idTopicHolder = new IdTopicConfigHolder();
this.idTopicHolder.configure(commonPropertiesContext);
// cacheHolder
@@ -185,7 +184,8 @@ public class MessageQueueZoneSinkContext extends SinkContext {
final long currentTime = System.currentTimeMillis();
currentRecord.getEvents().forEach(event -> {
long msgTime = event.getMsgTime();
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ long auditFormatTime =
+ msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
if (result) {
@@ -220,7 +220,8 @@ public class MessageQueueZoneSinkContext extends SinkContext {
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, mqName);
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
long msgTime = currentRecord.getDispatchTime();
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ long auditFormatTime =
+ msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
long count = currentRecord.getCount();
@@ -244,7 +245,8 @@ public class MessageQueueZoneSinkContext extends SinkContext {
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "-");
long msgTime = System.currentTimeMillis();
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ long auditFormatTime =
+ msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
metricItem.sendFailCount.incrementAndGet();
@@ -279,14 +281,13 @@ public class MessageQueueZoneSinkContext extends SinkContext {
* createCacheClusterSelector
*/
public CacheClusterSelector createCacheClusterSelector() {
- String strSelectorClass = CommonPropertiesHolder.getString(KEY_CACHE_CLUSTER_SELECTOR,
- AllCacheClusterSelector.class.getName());
+ String strSelectorClass = CommonConfigHolder.getInstance().getCacheClusterSelector();
try {
Class<?> selectorClass = ClassUtils.getClass(strSelectorClass);
Object selectorObject = selectorClass.getDeclaredConstructor().newInstance();
if (selectorObject instanceof Configurable) {
Configurable configurable = (Configurable) selectorObject;
- configurable.configure(new Context(CommonPropertiesHolder.get()));
+ configurable.configure(new Context(CommonConfigHolder.getInstance().getProperties()));
}
if (selectorObject instanceof CacheClusterSelector) {
CacheClusterSelector selector = (CacheClusterSelector) selectorObject;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 64612e79e..9eaebb56f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -36,7 +36,7 @@ import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
-import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
@@ -168,11 +168,7 @@ public abstract class BaseSource
}
super.start();
// initial metric item set
- ConfigManager configManager = ConfigManager.getInstance();
- String clusterId =
- configManager.getCommonProperties().getOrDefault(
- ConfigConstants.PROXY_CLUSTER_NAME,
- ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+ String clusterId = CommonConfigHolder.getInstance().getClusterName();
this.metricItemSet =
new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port));
MetricRegister.register(metricItemSet);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index e31f5e40d..a846b3df7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -47,6 +47,7 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.base.ProxyMessage;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -395,10 +396,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
// check topic configure
if (StringUtils.isEmpty(configTopic)) {
- String acceptMsg =
- configManager.getCommonProperties().getOrDefault(
- ConfigConstants.SOURCE_NO_TOPIC_ACCEPT, "false");
- if ("true".equalsIgnoreCase(acceptMsg)) {
+ if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
configTopic = this.defaultTopic;
} else {
commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
index c9f4d178c..be8893997 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
@@ -23,8 +23,7 @@ import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Context;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
@@ -84,8 +83,7 @@ public class SourceContext {
public SourceContext(AbstractSource source, ChannelGroup allChannels, Context context) {
this.source = source;
this.allChannels = allChannels;
- this.proxyClusterId = CommonPropertiesHolder.get()
- .getOrDefault(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME, "unknown");
+ this.proxyClusterId = CommonConfigHolder.getInstance().getClusterName();
this.sourceId = source.getName();
// metric
this.metricItemSet = new DataProxyMetricItemSet(sourceId);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
index 4d7864993..80b433c10 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
@@ -18,8 +18,8 @@
package org.apache.inlong.dataproxy.source.tcp;
import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.SourceContext;
@@ -147,10 +147,10 @@ public class InlongTcpChannelHandler extends ChannelInboundHandlerAdapter {
this.responsePackage(ctx, ResultCode.SUCCUSS, packObject);
}
// process
- if (!CommonPropertiesHolder.isResponseAfterSave()) {
- this.processAndResponse(ctx, packObject, events);
- } else {
+ if (CommonConfigHolder.getInstance().isResponseAfterSave()) {
this.processAndWaitingSave(ctx, packObject, events);
+ } else {
+ this.processAndResponse(ctx, packObject, events);
}
}
@@ -174,8 +174,8 @@ public class InlongTcpChannelHandler extends ChannelInboundHandlerAdapter {
events.forEach(event -> {
this.addMetric(true, event.getBody().length, event);
});
- boolean awaitResult = callback.getLatch().await(CommonPropertiesHolder.getMaxResponseTimeout(),
- TimeUnit.MILLISECONDS);
+ boolean awaitResult = callback.getLatch().await(
+ CommonConfigHolder.getInstance().getMaxResAfterSaveTimeout(), TimeUnit.MILLISECONDS);
if (!awaitResult) {
if (!callback.getHasResponsed().getAndSet(true)) {
this.responsePackage(ctx, ResultCode.ERR_REJECT, packObject);
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
similarity index 58%
rename from inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
rename to inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
index 77773cffc..7102fef1e 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
@@ -15,30 +15,26 @@
* limitations under the License.
*/
-package org.apache.inlong.dataproxy.config.loader;
+package org.apache.inlong.dataproxy.config.holder;
import org.apache.inlong.common.metric.MetricListener;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+import org.junit.Assert;
import org.junit.Test;
-import java.util.Map;
-
import static org.junit.Assert.assertEquals;
/**
- * Test for {@link ClassResourceCommonPropertiesLoader}
+ * Test for {@link CommonConfigHolder}
*/
-public class TestClassResourceCommonPropertiesLoader {
+public class TestCommonConfigHolder {
- /**
- * testResult
- */
@Test
- public void testResult() {
- // increase source
- ClassResourceCommonPropertiesLoader loader = new ClassResourceCommonPropertiesLoader();
- Map<String, String> props = loader.load();
- assertEquals("proxy_inlong5th_sz", props.get(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME));
- assertEquals("DataProxy", props.get(MetricListener.KEY_METRIC_DOMAINS));
+ public void testCase() {
+ Assert.assertEquals("proxy_inlong5th_sz",
+ CommonConfigHolder.getInstance().getClusterName());
+ Assert.assertTrue(CommonConfigHolder.getInstance().isEnableWhiteList());
+ assertEquals("DataProxy",
+ CommonConfigHolder.getInstance().getProperties().get(MetricListener.KEY_METRIC_DOMAINS));
}
}