You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "healchow (via GitHub)" <gi...@apache.org> on 2023/04/27 12:15:16 UTC

[GitHub] [inlong] healchow commented on a diff in pull request #7942: [INLONG-7931][DataProxy] Optimize common.properties related control mechanism

healchow commented on code in PR #7942:
URL: https://github.com/apache/inlong/pull/7942#discussion_r1179061423


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java:
##########
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.sink.common.DefaultEventHandler;
+import org.apache.inlong.dataproxy.sink.mq.AllCacheClusterSelector;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * common.properties Configure Holder
+ */
+public class CommonConfigHolder {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CommonConfigHolder.class);
+    // configure file name
+    private static final String COMMON_CONFIG_FILE_NAME = "common.properties";
+    // **** allowed keys and default value, begin
+    // cluster tag
+    public static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
+    public static final String VAL_DEF_CLUSTER_TAG = "default_cluster";
+    // cluster name
+    public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
+    public static final String VAL_DEF_CLUSTER_NAME = "default_dataproxy";
+    // cluster incharges
+    public static final String KEY_PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
+    public static final String VAL_DEF_CLUSTER_INCHARGES = "admin";
+    // cluster exttag,
+    public static final String KEY_PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag";
+    // predefined format of ext tag: {key}={value}
+    public static final String VAL_DEF_CLUSTER_EXT_TAG = "default=true";
+    // manager type
+    public static final String KEY_MANAGER_TYPE = "manager.type";
+    public static final String VAL_DEF_MANAGER_TYPE = DefaultManagerIpListParser.class.getName();
+    // manager hosts
+    public static final String KEY_MANAGER_HOSTS = "manager.hosts";
+    public static final String KEY_MANAGER_HOSTS_SEPARATOR = ",";
+    // manager auth secret id
+    public static final String KEY_MANAGER_AUTH_SECRET_ID = "manager.auth.secretId";
+    // manager auth secret key
+    public static final String KEY_MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey";
+    // configure file check interval
+    private static final String KEY_CONFIG_CHECK_INTERVAL_MS = "configCheckInterval";
+    public static final long VAL_DEF_CONFIG_CHECK_INTERVAL_MS = 60000L;
+    // Whether to accept messages without mapping between groupId/streamId and topic
+    public static final String KEY_NOTFOUND_TOPIC_ACCEPT = "source.topic.notfound.accept";
+    public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false;
+    // whether enable whitelist, optional field.
+    public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist";
+    public static final boolean VAL_DEF_ENABLE_WHITELIST = false;
+    // Audit fields
+    public static final String KEY_ENABLE_AUDIT = "audit.enable";
+    public static final boolean VAL_DEF_ENABLE_AUDIT = true;
+    public static final String KEY_AUDIT_PROXYS = "audit.proxys";
+    public static final String KEY_AUDIT_FILE_PATH = "audit.filePath";
+    public static final String VAL_DEF_AUDIT_FILE_PATH = "/data/inlong/audit/";
+    public static final String KEY_AUDIT_MAX_CACHE_ROWS = "audit.maxCacheRows";
+    public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 2000000;
+    public static final String KEY_AUDIT_FORMAT_INTERVAL_MS = "auditFormatInterval";
+    public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 60000L;
+    // Whether response after save msg
+    public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
+    public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false;
+    // Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
+    public static final String KEY_MAX_RAS_TIMEOUT_MS = "maxRASTimeoutMs";
+    public static final long VAL_DEF_MAX_RAS_TIMEOUT_MS = 10000L;
+    // max buffer queue size in Kb
+    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
+    public static final int VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
+    // event handler
+    public static final String KEY_EVENT_HANDLER = "eventHandler";
+    public static final String VAL_DEF_EVENT_HANDLER = DefaultEventHandler.class.getName();
+    // cache cluster selector
+    public static final String KEY_CACHE_CLUSTER_SELECTOR = "cacheClusterSelector";
+    public static final String VAL_DEF_CACHE_CLUSTER_SELECTOR = AllCacheClusterSelector.class.getName();
+    // proxy node id
+    public static final String KEY_PROXY_NODE_ID = "nodeId";
+    public static final String VAL_DEF_PROXY_NODE_ID = "127.0.0.1";
+    // msg sent compress type
+    public static final String KEY_MSG_SENT_COMPRESS_TYPE = "compressType";
+    public static final String VAL_DEF_MSG_COMPRESS_TYPE = ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name();
+    // prometheus http port
+    public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
+    public static final int VAL_DEF_PROMETHEUS_HTTP_PORT = 8080;
+    // **** allowed keys and default value, end
+
+    // class instance
+    private static CommonConfigHolder instance = null;
+    private static volatile boolean isInit = false;
+    private Map<String, String> props;
+    // pre-read field values
+    private String clusterTag = VAL_DEF_CLUSTER_TAG;
+    private String clusterName = VAL_DEF_CLUSTER_NAME;
+    private String clusterIncharges = VAL_DEF_CLUSTER_INCHARGES;
+    private String clusterExtTag = VAL_DEF_CLUSTER_EXT_TAG;
+    private String managerType = VAL_DEF_MANAGER_TYPE;
+    private IManagerIpListParser ipListParser = null;
+    private String managerAuthSecretId = "";
+    private String managerAuthSecretKey = "";
+    private long configChkInvlMs = VAL_DEF_CONFIG_CHECK_INTERVAL_MS;
+    private boolean enableAudit = VAL_DEF_ENABLE_AUDIT;
+    private final HashSet<String> auditProxys = new HashSet<>();
+    private String auditFilePath = VAL_DEF_AUDIT_FILE_PATH;
+    private int auditMaxCacheRows = VAL_DEF_AUDIT_MAX_CACHE_ROWS;
+    private long auditFormatInvlMs = VAL_DEF_AUDIT_FORMAT_INTERVAL_MS;
+    private boolean responseAfterSave = VAL_DEF_RESPONSE_AFTER_SAVE;
+    private long maxResAfterSaveTimeout = VAL_DEF_MAX_RAS_TIMEOUT_MS;
+    private boolean noTopicAccept = VAL_DEF_NOTFOUND_TOPIC_ACCEPT;
+    private boolean enableWhiteList = VAL_DEF_ENABLE_WHITELIST;
+    private int maxBufferQueueSizeKb = VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB;
+    private String eventHandler = VAL_DEF_EVENT_HANDLER;
+    private String cacheClusterSelector = VAL_DEF_CACHE_CLUSTER_SELECTOR;
+    private String proxyNodeId = VAL_DEF_PROXY_NODE_ID;
+    private String msgCompressType = VAL_DEF_MSG_COMPRESS_TYPE;
+    private int prometheusHttpPort = VAL_DEF_PROMETHEUS_HTTP_PORT;
+
+    /**
+     * get instance for common.properties config manager
+     */
+    public static CommonConfigHolder getInstance() {
+        if (isInit && instance != null) {
+            return instance;
+        }
+        synchronized (CommonConfigHolder.class) {
+            if (!isInit) {
+                instance = new CommonConfigHolder();
+                if (instance.loadConfigFile()) {
+                    instance.preReadFields();
+                }
+                isInit = true;
+            }
+        }
+        return instance;
+    }
+
+    /**
+     * Get the original attribute map
+     *
+     * Notice: only the non-pre-read fields need to be searched from the attribute map,
+     *         the pre-read fields MUST be got according to the methods in the class.
+     */
+    public Map<String, String> getProperties() {
+        return this.props;
+    }
+
+    /**
+     * getStringFromContext
+     *
+     * @param context
+     * @param key
+     * @param defaultValue
+     * @return
+     */
+    public static String getStringFromContext(Context context, String key, String defaultValue) {
+        String value = context.getString(key);
+        value = (value != null) ? value : getInstance().getProperties().getOrDefault(key, defaultValue);
+        return value;
+    }
+
+    public String getClusterTag() {
+        return clusterTag;
+    }
+
+    public String getClusterName() {
+        return this.clusterName;
+    }
+
+    public String getClusterIncharges() {
+        return clusterIncharges;
+    }
+
+    public String getClusterExtTag() {
+        return clusterExtTag;
+    }
+
+    public long getConfigChkInvlMs() {
+        return configChkInvlMs;
+    }
+
+    public boolean isNoTopicAccept() {
+        return noTopicAccept;
+    }
+
+    public boolean isEnableWhiteList() {
+        return this.enableWhiteList;
+    }
+
+    public String getManagerType() {
+        return managerType;
+    }
+
+    public List<String> getManagerHosts() {
+        return this.ipListParser.getIpList();
+    }
+
+    public String getManagerAuthSecretId() {
+        return managerAuthSecretId;
+    }
+
+    public String getManagerAuthSecretKey() {
+        return managerAuthSecretKey;
+    }
+
+    public boolean isEnableAudit() {
+        return enableAudit;
+    }
+
+    public HashSet<String> getAuditProxys() {
+        return auditProxys;
+    }
+
+    public String getAuditFilePath() {
+        return auditFilePath;
+    }
+
+    public int getAuditMaxCacheRows() {
+        return auditMaxCacheRows;
+    }
+
+    public long getAuditFormatInvlMs() {
+        return auditFormatInvlMs;
+    }
+
+    public boolean isResponseAfterSave() {
+        return responseAfterSave;
+    }
+
+    public long getMaxResAfterSaveTimeout() {
+        return maxResAfterSaveTimeout;
+    }
+
+    public int getMaxBufferQueueSizeKb() {
+        return maxBufferQueueSizeKb;
+    }
+
+    public String getEventHandler() {
+        return eventHandler;
+    }
+
+    public String getCacheClusterSelector() {
+        return cacheClusterSelector;
+    }
+
+    public int getPrometheusHttpPort() {
+        return prometheusHttpPort;
+    }
+
+    public String getProxyNodeId() {
+        return proxyNodeId;
+    }
+
+    public String getMsgCompressType() {
+        return msgCompressType;
+    }
+
+    private void preReadFields() {
+        String tmpValue;

Review Comment:
   Can we use Properties or JSON parsing tools to automatically parse the configuration in the file, and then map it to a Java POJO?
   Now this kind of modification is not very flexible, such as modifying or adding a new configuration, you also need to modify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org