You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/06 09:58:10 UTC
[incubator-inlong] branch master updated: [INLONG-4521][DataProxy][Manager] Change the naming of third-party cluster related classes (#4526)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3ee0eba3f [INLONG-4521][DataProxy][Manager] Change the naming of third-party cluster related classes (#4526)
3ee0eba3f is described below
commit 3ee0eba3f5ffd81e05c40d27e4550d4cd5eb9240
Author: healzhou <he...@gmail.com>
AuthorDate: Mon Jun 6 17:58:04 2022 +0800
[INLONG-4521][DataProxy][Manager] Change the naming of third-party cluster related classes (#4526)
* Change the naming of third-party cluster related classes
* Fix code style and unit tests error
---
.../common/pojo/dataproxy/DataProxyCluster.java | 15 +-
.../common/pojo/dataproxy/DataProxyConfig.java | 50 ++-----
...ataProxyConfig.java => DataProxyTopicInfo.java} | 48 ++++--
...irdPartyClusterInfo.java => MQClusterInfo.java} | 6 +-
.../pojo/dataproxy/ThirdPartyClusterDTO.java | 43 ------
inlong-dataproxy/bin/dataproxy-start.sh | 2 +-
.../inlong/dataproxy/config/ConfigManager.java | 142 +++++++++---------
.../inlong/dataproxy/config/RemoteConfigJson.java | 6 +-
.../dataproxy/config/RemoteConfigManager.java | 120 ++++++---------
.../config/holder/CommonPropertiesHolder.java | 59 ++++----
...onfigHolder.java => MQClusterConfigHolder.java} | 27 ++--
.../config/holder/PropertiesConfigHolder.java | 2 +-
.../ClassResourceCommonPropertiesLoader.java | 42 +++---
.../config/loader/CommonPropertiesLoader.java | 14 +-
...artyClusterConfig.java => MQClusterConfig.java} | 162 +++++++++++----------
.../dataproxy/consts/AttributeConstants.java | 59 ++++----
.../inlong/dataproxy/consts/ConfigConstants.java | 4 +
.../org/apache/inlong/dataproxy/http/Context.java | 2 +-
.../inlong/dataproxy/http/HttpBaseSource.java | 53 +++----
.../inlong/dataproxy/http/HttpSourceConstants.java | 30 ----
.../inlong/dataproxy/http/MappedContext.java | 7 +-
.../inlong/dataproxy/http/MessageFilter.java | 37 ++---
.../inlong/dataproxy/http/MessageHandler.java | 3 +-
.../dataproxy/http/MessageProcessServlet.java | 34 ++---
.../inlong/dataproxy/http/SimpleHttpSource.java | 3 +-
.../dataproxy/http/SimpleMessageHandler.java | 153 ++++++-------------
.../apache/inlong/dataproxy/http/StatusCode.java | 8 +-
.../apache/inlong/dataproxy/sink/PulsarSink.java | 16 +-
.../apache/inlong/dataproxy/sink/SinkContext.java | 48 +++---
.../org/apache/inlong/dataproxy/sink/TubeSink.java | 125 ++++------------
.../dataproxy/sink/pulsar/PulsarClientService.java | 8 +-
.../inlong/dataproxy/sink/pulsar/SinkTask.java | 4 +-
.../TestMQClusterConfigLoader.java} | 29 ++--
.../TestClassResourceCommonPropertiesLoader.java | 21 ++-
.../TestContextCacheClusterConfigLoader.java | 43 +++---
.../loader/TestContextIdTopicConfigLoader.java | 42 +++---
.../metrics/TestDataProxyMetricItemSet.java | 6 +-
.../metrics/TestMetricListenerRunnable.java | 6 +-
.../apache/inlong/dataproxy/utils/MockUtils.java | 6 +-
.../src/test/resources/common.properties | 29 ++--
.../src/test/resources/dataproxy-pulsar.conf | 27 ++--
...ty_cluster.properties => mq_cluster.properties} | 7 +-
.../src/test/resources/topics.properties | 2 +-
.../manager/service/core/InlongClusterService.java | 4 +-
.../core/impl/InlongClusterServiceImpl.java | 20 +--
.../manager/service/mq/util/PulsarUtils.java | 6 +-
.../controller/openapi/DataProxyController.java | 11 +-
47 files changed, 657 insertions(+), 934 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyCluster.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyCluster.java
index c293c1144..344caf330 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyCluster.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyCluster.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -21,12 +21,13 @@ package org.apache.inlong.common.pojo.dataproxy;
* DataProxyCluster
*/
public class DataProxyCluster {
+
private ProxyClusterObject proxyCluster = new ProxyClusterObject();
private CacheClusterSetObject cacheClusterSet = new CacheClusterSetObject();
/**
* get proxyCluster
- *
+ *
* @return the proxyCluster
*/
public ProxyClusterObject getProxyCluster() {
@@ -35,7 +36,7 @@ public class DataProxyCluster {
/**
* set proxyCluster
- *
+ *
* @param proxyCluster the proxyCluster to set
*/
public void setProxyCluster(ProxyClusterObject proxyCluster) {
@@ -44,7 +45,7 @@ public class DataProxyCluster {
/**
* get cacheClusterSet
- *
+ *
* @return the cacheClusterSet
*/
public CacheClusterSetObject getCacheClusterSet() {
@@ -53,7 +54,7 @@ public class DataProxyCluster {
/**
* set cacheClusterSet
- *
+ *
* @param cacheClusterSet the cacheClusterSet to set
*/
public void setCacheClusterSet(CacheClusterSetObject cacheClusterSet) {
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
index 4b0b6551a..66b8bd74b 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
@@ -17,54 +17,32 @@
package org.apache.inlong.common.pojo.dataproxy;
+import java.util.ArrayList;
+import java.util.List;
+
/**
- * DataProxy config
+ * Data proxy config, includes mq clusters and topic list.
*/
public class DataProxyConfig {
- private String topic;
- private String m;
- private String inlongGroupId;
-
- public DataProxyConfig() {
- }
-
- public DataProxyConfig(String topic, String m, String inlongGroupId) {
- this.topic = topic;
- this.m = m;
- this.inlongGroupId = inlongGroupId;
- }
-
- @Override
- public String toString() {
- return "DataProxyConfig{topic='" + topic + '\''
- + ", m='" + m + '\''
- + ", inlongGroupId='" + inlongGroupId + '\''
- + '}';
- }
+ private List<MQClusterInfo> mqClusterList = new ArrayList<>();
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
+ private List<DataProxyTopicInfo> topicList = new ArrayList<>();
- public String getM() {
- return m;
+ public List<MQClusterInfo> getMqClusterList() {
+ return mqClusterList;
}
- public void setM(String m) {
- this.m = m;
+ public void setMqClusterList(List<MQClusterInfo> mqClusterList) {
+ this.mqClusterList = mqClusterList;
}
- public String getInlongGroupId() {
- return inlongGroupId;
+ public List<DataProxyTopicInfo> getTopicList() {
+ return topicList;
}
- public void setInlongGroupId(String inlongGroupId) {
- this.inlongGroupId = inlongGroupId;
+ public void setTopicList(List<DataProxyTopicInfo> topicList) {
+ this.topicList = topicList;
}
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
similarity index 72%
copy from inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
index 4b0b6551a..d1deaa4f1 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
@@ -18,28 +18,44 @@
package org.apache.inlong.common.pojo.dataproxy;
/**
- * DataProxy config
+ * Topic info for DataProxy, includes the topic name and the inlongGroupId to which it belongs.
*/
-public class DataProxyConfig {
+public class DataProxyTopicInfo {
+ /**
+ * The topic name that needs to send data
+ */
private String topic;
- private String m;
+
+ /**
+ * The inlongGroupId to which the topic belongs
+ */
private String inlongGroupId;
- public DataProxyConfig() {
+ /**
+ * The data format, will deprecate in the future
+ */
+ @Deprecated
+ private String m;
+
+ public DataProxyTopicInfo() {
}
- public DataProxyConfig(String topic, String m, String inlongGroupId) {
+ public DataProxyTopicInfo(String topic, String inlongGroupId) {
+ this(topic, inlongGroupId, null);
+ }
+
+ public DataProxyTopicInfo(String topic, String inlongGroupId, String m) {
this.topic = topic;
- this.m = m;
this.inlongGroupId = inlongGroupId;
+ this.m = m;
}
@Override
public String toString() {
- return "DataProxyConfig{topic='" + topic + '\''
- + ", m='" + m + '\''
+ return "DataProxyTopicInfo{topic='" + topic + '\''
+ ", inlongGroupId='" + inlongGroupId + '\''
+ + ", m='" + m + '\''
+ '}';
}
@@ -51,14 +67,6 @@ public class DataProxyConfig {
this.topic = topic;
}
- public String getM() {
- return m;
- }
-
- public void setM(String m) {
- this.m = m;
- }
-
public String getInlongGroupId() {
return inlongGroupId;
}
@@ -67,4 +75,12 @@ public class DataProxyConfig {
this.inlongGroupId = inlongGroupId;
}
+ public String getM() {
+ return m;
+ }
+
+ public void setM(String m) {
+ this.m = m;
+ }
+
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
similarity index 96%
rename from inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterInfo.java
rename to inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
index 7a635263b..7b7dd3179 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
@@ -20,7 +20,11 @@ package org.apache.inlong.common.pojo.dataproxy;
import java.util.HashMap;
import java.util.Map;
-public class ThirdPartyClusterInfo {
+/**
+ * MQ cluster info.
+ */
+public class MQClusterInfo {
+
private String url;
private String token;
private Map<String, String> params = new HashMap<>();
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterDTO.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterDTO.java
deleted file mode 100644
index 87e573eb3..000000000
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/ThirdPartyClusterDTO.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.pojo.dataproxy;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ThirdPartyClusterDTO {
-
- private List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
- private List<DataProxyConfig> topicList = new ArrayList<>();
-
- public List<ThirdPartyClusterInfo> getMqSet() {
- return mqSet;
- }
-
- public void setMqSet(List<ThirdPartyClusterInfo> mqSet) {
- this.mqSet = mqSet;
- }
-
- public List<DataProxyConfig> getTopicList() {
- return topicList;
- }
-
- public void setTopicList(List<DataProxyConfig> topicList) {
- this.topicList = topicList;
- }
-}
diff --git a/inlong-dataproxy/bin/dataproxy-start.sh b/inlong-dataproxy/bin/dataproxy-start.sh
index fcf02a9d1..a0de3894c 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -32,7 +32,7 @@ error() {
fi
}
-for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties,third_party_cluster.properties}
+for i in {mx.properties,transfer.properties,weight.properties,common.properties,blacklist.properties,groupid_mapping.properties,dc_mapping.properties,topics.properties,tube_switch.properties,mq_cluster.properties}
do
if [ ! -f "$i" ]; then
touch "$i"
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index b2f99ad5f..ee591f614 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -27,15 +27,16 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterInfo;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.dataproxy.config.holder.FileConfigHolder;
import org.apache.inlong.dataproxy.config.holder.GroupIdPropertiesHolder;
+import org.apache.inlong.dataproxy.config.holder.MQClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.MxPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.PropertiesConfigHolder;
-import org.apache.inlong.dataproxy.config.holder.ThirdPartyClusterConfigHolder;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,36 +46,29 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+/**
+ * Config manager class.
+ */
public class ConfigManager {
- public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<ConfigHolder>();
- private static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class);
+ public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<>();
private static volatile boolean isInit = false;
-
private static ConfigManager instance = null;
- private final PropertiesConfigHolder commonConfig =
- new PropertiesConfigHolder("common.properties");
- private final PropertiesConfigHolder topicConfig =
- new PropertiesConfigHolder("topics.properties");
- private final ThirdPartyClusterConfigHolder thirdPartyClusterConfigHolder =
- new ThirdPartyClusterConfigHolder("third_party_cluster.properties");
+ private final PropertiesConfigHolder commonConfig = new PropertiesConfigHolder("common.properties");
+ private final MQClusterConfigHolder mqClusterConfigHolder = new MQClusterConfigHolder("mq_cluster.properties");
+ private final PropertiesConfigHolder topicConfig = new PropertiesConfigHolder("topics.properties");
private final MxPropertiesHolder mxConfig = new MxPropertiesHolder("mx.properties");
- private final GroupIdPropertiesHolder groupIdConfig =
- new GroupIdPropertiesHolder("groupid_mapping.properties");
- private final PropertiesConfigHolder dcConfig =
- new PropertiesConfigHolder("dc_mapping.properties");
- private final PropertiesConfigHolder transferConfig =
- new PropertiesConfigHolder("transfer.properties");
- private final PropertiesConfigHolder tubeSwitchConfig =
- new PropertiesConfigHolder("tube_switch.properties");
- private final PropertiesConfigHolder weightHolder =
- new PropertiesConfigHolder("weight.properties");
- private final FileConfigHolder blackListConfig =
- new FileConfigHolder("blacklist.properties");
+
+ private final GroupIdPropertiesHolder groupIdConfig = new GroupIdPropertiesHolder("groupid_mapping.properties");
+ private final PropertiesConfigHolder dcConfig = new PropertiesConfigHolder("dc_mapping.properties");
+ private final PropertiesConfigHolder transferConfig = new PropertiesConfigHolder("transfer.properties");
+ private final PropertiesConfigHolder tubeSwitchConfig = new PropertiesConfigHolder("tube_switch.properties");
+ private final PropertiesConfigHolder weightHolder = new PropertiesConfigHolder("weight.properties");
+ private final FileConfigHolder blackListConfig = new FileConfigHolder("blacklist.properties");
/**
- * get instance for manager
+ * get instance for config manager
*/
public static ConfigManager getInstance() {
if (isInit && instance != null) {
@@ -145,8 +139,8 @@ public class ConfigManager {
return updatePropertiesHolder(result, topicConfig, false);
}
- public boolean updateThirdPartyClusterProperties(Map<String, String> result) {
- return updatePropertiesHolder(result, thirdPartyClusterConfigHolder, true);
+ public boolean updateMQClusterProperties(Map<String, String> result) {
+ return updatePropertiesHolder(result, mqClusterConfigHolder, true);
}
public Map<String, String> getMxProperties() {
@@ -197,16 +191,16 @@ public class ConfigManager {
return topicConfig;
}
- public ThirdPartyClusterConfigHolder getThirdPartyClusterHolder() {
- return thirdPartyClusterConfigHolder;
+ public MQClusterConfigHolder getMqClusterHolder() {
+ return mqClusterConfigHolder;
}
- public ThirdPartyClusterConfig getThirdPartyClusterConfig() {
- return thirdPartyClusterConfigHolder.getClusterConfig();
+ public MQClusterConfig getMqClusterConfig() {
+ return mqClusterConfigHolder.getClusterConfig();
}
- public Map<String, String> getThirdPartyClusterUrl2Token() {
- return thirdPartyClusterConfigHolder.getUrl2token();
+ public Map<String, String> getMqClusterUrl2Token() {
+ return mqClusterConfigHolder.getUrl2token();
}
/**
@@ -219,16 +213,16 @@ public class ConfigManager {
private final CloseableHttpClient httpClient;
private final Gson gson = new Gson();
private boolean isRunning = true;
-
- public static ReloadConfigWorker create(ConfigManager managerInstance) {
- return new ReloadConfigWorker(managerInstance);
- }
private ReloadConfigWorker(ConfigManager managerInstance) {
this.configManager = managerInstance;
this.httpClient = constructHttpClient();
}
+ public static ReloadConfigWorker create(ConfigManager managerInstance) {
+ return new ReloadConfigWorker(managerInstance);
+ }
+
private synchronized CloseableHttpClient constructHttpClient() {
long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
RequestConfig requestConfig = RequestConfig.custom()
@@ -244,15 +238,14 @@ public class ConfigManager {
}
private long getSleepTime() {
- String sleepTimeInMsStr =
- configManager.getCommonProperties().get("configCheckInterval");
+ String sleepTimeInMsStr = configManager.getCommonProperties().get("configCheckInterval");
long sleepTimeInMs = 10000;
try {
if (sleepTimeInMsStr != null) {
sleepTimeInMs = Long.parseLong(sleepTimeInMsStr);
}
- } catch (Exception ignored) {
- LOG.info("ignored Exception ", ignored);
+ } catch (Exception e) {
+ LOG.info("ignored exception ", e);
}
return sleepTimeInMs + getRandom(0, 5000);
}
@@ -262,9 +255,7 @@ public class ConfigManager {
}
private void checkLocalFile() {
-
for (ConfigHolder holder : CONFIG_HOLDER_LIST) {
-
boolean isChanged = holder.checkAndUpdateHolder();
if (isChanged) {
holder.executeCallbacks();
@@ -272,15 +263,16 @@ public class ConfigManager {
}
}
- private boolean checkWithManager(String host, String proxyClusterName) {
+ private boolean checkWithManager(String host, String clusterName) {
+ if (StringUtils.isEmpty(clusterName)) {
+ LOG.error("proxyClusterName is null");
+ return false;
+ }
+
HttpGet httpGet = null;
try {
- if (StringUtils.isEmpty(proxyClusterName)) {
- LOG.error("proxyClusterName is null");
- return false;
- }
- String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getConfig_v2?clusterName="
- + proxyClusterName;
+ String url = "http://" + host + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_GET_CONFIG_PATH
+ + "?clusterName=" + clusterName;
LOG.info("start to request {} to get config info", url);
httpGet = new HttpGet(url);
httpGet.addHeader(HttpHeaders.CONNECTION, "close");
@@ -289,37 +281,35 @@ public class ConfigManager {
CloseableHttpResponse response = httpClient.execute(httpGet);
String returnStr = EntityUtils.toString(response.getEntity());
// get groupId <-> topic and m value.
-
RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
- Map<String, String> groupIdToTopic = new HashMap<String, String>();
- Map<String, String> groupIdToMValue = new HashMap<String, String>();
- Map<String, String> mqConfig = new HashMap<>();// include url2token and other params
-
- if (configJson.isSuccess() && configJson.getData() != null) { //success get config
- LOG.info("getConfig_v2 result: {}", returnStr);
+ Map<String, String> groupIdToTopic = new HashMap<>();
+ Map<String, String> groupIdToMValue = new HashMap<>();
+ // include url2token and other params
+ Map<String, String> mqConfig = new HashMap<>();
+
+ // get config success
+ if (configJson.isSuccess() && configJson.getData() != null) {
+ LOG.info("getConfig result: {}", returnStr);
/*
* get mqUrls <->token maps;
- * if mq is pulsar, store format: third-party-cluster.index1=cluster1url1,cluster1url2=token
- * if mq is tubemq, token is "", store format: third-party-cluster.index1=cluster1url1,cluster1url2=
+ * if mq is pulsar, store format: mq_cluster.index1=cluster1url1,cluster1url2=token
+ * if mq is tubemq, token is "", store format: mq_cluster.index1=cluster1url1,cluster1url2=
*/
int index = 1;
- List<ThirdPartyClusterInfo> clusterSet = configJson.getData().getMqSet();
+ List<MQClusterInfo> clusterSet = configJson.getData().getMqClusterList();
if (clusterSet == null || clusterSet.isEmpty()) {
LOG.error("getConfig from manager: no available mq config");
return false;
}
- for (ThirdPartyClusterInfo mqCluster : clusterSet) {
- String key = ThirdPartyClusterConfigHolder.URL_STORE_PREFIX + index;
- String value = mqCluster.getUrl() + AttributeConstants.KEY_VALUE_SEPARATOR
- + mqCluster.getToken();
+ for (MQClusterInfo mqCluster : clusterSet) {
+ String key = MQClusterConfigHolder.URL_STORE_PREFIX + index;
+ String value =
+ mqCluster.getUrl() + AttributeConstants.KEY_VALUE_SEPARATOR + mqCluster.getToken();
mqConfig.put(key, value);
++index;
}
- // mq other params
- mqConfig.putAll(clusterSet.get(0).getParams());
-
- for (DataProxyConfig topic : configJson.getData().getTopicList()) {
+ for (DataProxyTopicInfo topic : configJson.getData().getTopicList()) {
if (!StringUtils.isEmpty(topic.getM())) {
groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
}
@@ -329,14 +319,16 @@ public class ConfigManager {
}
configManager.addMxProperties(groupIdToMValue);
configManager.addTopicProperties(groupIdToTopic);
- configManager.updateThirdPartyClusterProperties(mqConfig);
+ // other params for mq
+ mqConfig.putAll(clusterSet.get(0).getParams());
+ configManager.updateMQClusterProperties(mqConfig);
// store mq common configs and url2token
- configManager.getThirdPartyClusterConfig().putAll(mqConfig);
- configManager.getThirdPartyClusterHolder()
- .setUrl2token(configManager.getThirdPartyClusterHolder().getUrl2token());
+ configManager.getMqClusterConfig().putAll(mqConfig);
+ configManager.getMqClusterHolder()
+ .setUrl2token(configManager.getMqClusterHolder().getUrl2token());
} else {
- LOG.error("getConfig from manager: {}", configJson.getErrMsg());
+ LOG.error("getConfig from manager error: {}", configJson.getErrMsg());
}
} catch (Exception ex) {
LOG.error("exception caught", ex);
@@ -358,7 +350,6 @@ public class ConfigManager {
}
String[] hostList = StringUtils.split(managerHosts, ",");
for (String host : hostList) {
-
if (checkWithManager(host, proxyClusterName)) {
break;
}
@@ -372,7 +363,6 @@ public class ConfigManager {
public void run() {
long count = 0;
while (isRunning) {
-
long sleepTimeInMs = getSleepTime();
count += 1;
try {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
index 7182d2336..5073acdb6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
@@ -17,13 +17,13 @@
package org.apache.inlong.dataproxy.config;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
public class RemoteConfigJson {
private boolean success;
private String errMsg;
- private ThirdPartyClusterDTO data;
+ private DataProxyConfig data;
public boolean isSuccess() {
return success;
@@ -33,7 +33,7 @@ public class RemoteConfigJson {
return errMsg;
}
- public ThirdPartyClusterDTO getData() {
+ public DataProxyConfig getData() {
return data;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index 861e364e6..228d76bb9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -18,7 +18,6 @@
package org.apache.inlong.dataproxy.config;
import com.google.gson.Gson;
-
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.http.HttpHeaders;
@@ -41,6 +40,7 @@ import org.apache.inlong.common.pojo.dataproxy.ProxySink;
import org.apache.inlong.common.pojo.dataproxy.ProxySource;
import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,25 +61,26 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RemoteConfigManager implements IRepository {
- private static final Logger LOGGER = LoggerFactory.getLogger(RemoteConfigManager.class);
- public static final String KEY_CONFIG_CHECK_INTERVAL = "configCheckInterval";
public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
- public static final String KEY_SET_NAME = "set.name";
- public static final char FLUME_SEPARATOR = '.';
+ private static final String KEY_PROXY_CLUSTER_TAG = "proxy.cluster.tag";
+ private static final char FLUME_SEPARATOR = '.';
+ private static final String KEY_CONFIG_CHECK_INTERVAL = "configCheckInterval";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RemoteConfigManager.class);
+ private static final Gson GSON = new Gson();
private static volatile boolean isInit = false;
private static RemoteConfigManager instance = null;
+ private final AtomicInteger managerIpListIndex = new AtomicInteger(0);
+ private final AtomicReference<DataProxyCluster> currentClusterConfigRef = new AtomicReference<>();
+ private String dataProxyConfigMd5;
+
private long reloadInterval;
private Timer reloadTimer;
- //
private IManagerIpListParser ipListParser;
private CloseableHttpClient httpClient;
- private Gson gson = new Gson();
- private AtomicInteger managerIpListIndex = new AtomicInteger(0);
- // config
- private String dataProxyConfigMd5;
- private AtomicReference<DataProxyCluster> currentClusterConfigRef = new AtomicReference<>();
+
// flume properties
private Map<String, String> flumeProperties;
// inlong id map
@@ -90,7 +91,7 @@ public class RemoteConfigManager implements IRepository {
/**
* get instance for manager
- *
+ *
* @return RemoteConfigManager
*/
@SuppressWarnings("unchecked")
@@ -129,16 +130,29 @@ public class RemoteConfigManager implements IRepository {
}
/**
- * reload
+ * constructHttpClient
+ */
+ private static synchronized CloseableHttpClient constructHttpClient() {
+ long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout((int) timeoutInMs)
+ .setSocketTimeout((int) timeoutInMs).build();
+ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+ httpClientBuilder.setDefaultRequestConfig(requestConfig);
+ return httpClientBuilder.build();
+ }
+
+ /**
+ * Reload config
*/
public void reload() {
- LOGGER.info("start to reload config.");
+ LOGGER.info("start to reload config");
String proxyClusterName = CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_NAME);
- String setName = CommonPropertiesHolder.getString(KEY_SET_NAME);
- if (StringUtils.isBlank(proxyClusterName) || StringUtils.isBlank(setName)) {
+ String proxyClusterTag = CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_TAG);
+ if (StringUtils.isBlank(proxyClusterName) || StringUtils.isBlank(proxyClusterTag)) {
return;
}
- //
+
this.ipListParser.setCommonProperties(CommonPropertiesHolder.get());
List<String> managerIpList = this.ipListParser.getIpList();
if (managerIpList == null || managerIpList.size() == 0) {
@@ -147,12 +161,12 @@ public class RemoteConfigManager implements IRepository {
int managerIpSize = managerIpList.size();
for (int i = 0; i < managerIpList.size(); i++) {
String host = managerIpList.get(Math.abs(managerIpListIndex.getAndIncrement()) % managerIpSize);
- if (this.reloadDataProxyConfig(proxyClusterName, setName, host)) {
+ if (this.reloadDataProxyConfig(proxyClusterName, proxyClusterTag, host)) {
break;
}
}
- LOGGER.info("end to reload config.");
+ LOGGER.info("success to reload config");
}
/**
@@ -166,15 +180,12 @@ public class RemoteConfigManager implements IRepository {
/**
* reloadDataProxyConfig
- *
- * @param host
- * @return
*/
- private boolean reloadDataProxyConfig(String proxyClusterName, String setName, String host) {
+ private boolean reloadDataProxyConfig(String clusterName, String clusterTag, String host) {
HttpGet httpGet = null;
try {
- String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getAllConfig?clusterName="
- + proxyClusterName + "&setName=" + setName;
+ String url = "http://" + host + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_GET_ALL_CONFIG_PATH
+ + "?clusterName=" + clusterName + "&clusterTag=" + clusterTag;
if (StringUtils.isNotBlank(this.dataProxyConfigMd5)) {
url += "&md5=" + this.dataProxyConfigMd5;
}
@@ -188,7 +199,7 @@ public class RemoteConfigManager implements IRepository {
LOGGER.info("end to request {} to get config info:{}", url, returnStr);
// get groupId <-> topic and m value.
- DataProxyConfigResponse proxyResponse = gson.fromJson(returnStr, DataProxyConfigResponse.class);
+ DataProxyConfigResponse proxyResponse = GSON.fromJson(returnStr, DataProxyConfigResponse.class);
if (!proxyResponse.isResult()) {
LOGGER.info("Fail to get config info from url:{}, error code is {}", url, proxyResponse.getErrCode());
return false;
@@ -216,24 +227,9 @@ public class RemoteConfigManager implements IRepository {
return true;
}
- /**
- * constructHttpClient
- *
- * @return
- */
- private static synchronized CloseableHttpClient constructHttpClient() {
- long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
- RequestConfig requestConfig = RequestConfig.custom()
- .setConnectTimeout((int) timeoutInMs)
- .setSocketTimeout((int) timeoutInMs).build();
- HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
- httpClientBuilder.setDefaultRequestConfig(requestConfig);
- return httpClientBuilder.build();
- }
-
/**
* getZone
- *
+ *
* @return
*/
public String getZone() {
@@ -245,9 +241,7 @@ public class RemoteConfigManager implements IRepository {
}
/**
- * getProxyClusterName
- *
- * @return
+ * Get proxy cluster name
*/
public String getProxyClusterName() {
DataProxyCluster currentClusterConfig = currentClusterConfigRef.get();
@@ -258,16 +252,14 @@ public class RemoteConfigManager implements IRepository {
}
/**
- * getProxyClusterName
- *
- * @return
+ * Get proxy cluster tag
*/
- public String getSetName() {
+ public String getProxyClusterTag() {
DataProxyCluster currentClusterConfig = currentClusterConfigRef.get();
if (currentClusterConfig != null) {
return currentClusterConfig.getProxyCluster().getSetName();
}
- return CommonPropertiesHolder.getString(KEY_SET_NAME);
+ return CommonPropertiesHolder.getString(KEY_PROXY_CLUSTER_TAG);
}
/**
@@ -285,7 +277,6 @@ public class RemoteConfigManager implements IRepository {
}
/**
- *
* generateFlumeProperties
*/
private void generateFlumeProperties() {
@@ -302,9 +293,6 @@ public class RemoteConfigManager implements IRepository {
/**
* generateFlumeChannels
- *
- * @param proxyClusterObject
- * @param newConfig
*/
private void generateFlumeChannels(Map<String, String> newConfig) {
StringBuilder builder = new StringBuilder();
@@ -341,9 +329,6 @@ public class RemoteConfigManager implements IRepository {
/**
* generateFlumeSink
- *
- * @param proxyClusterObject
- * @param newConfig
*/
private void generateFlumeSinks(Map<String, String> newConfig) {
StringBuilder builder = new StringBuilder();
@@ -414,9 +399,6 @@ public class RemoteConfigManager implements IRepository {
/**
* generateFlumeSources
- *
- * @param proxyClusterObject
- * @param newConfig
*/
private void generateFlumeSources(Map<String, String> newConfig) {
StringBuilder builder = new StringBuilder();
@@ -468,8 +450,6 @@ public class RemoteConfigManager implements IRepository {
/**
* getFlumeProperties
- *
- * @return
*/
public Map<String, String> getFlumeProperties() {
return flumeProperties;
@@ -477,28 +457,20 @@ public class RemoteConfigManager implements IRepository {
/**
* getInlongIdMap
- *
- * @return
*/
public Map<String, InLongIdObject> getInlongIdMap() {
return inlongIdMap;
}
/**
- *
* getCurrentClusterConfig
- *
- * @return
*/
public DataProxyCluster getCurrentClusterConfig() {
- DataProxyCluster currentClusterConfig = currentClusterConfigRef.get();
- return currentClusterConfig;
+ return currentClusterConfigRef.get();
}
/**
* get currentClusterConfigRef
- *
- * @return the currentClusterConfigRef
*/
public AtomicReference<DataProxyCluster> getCurrentClusterConfigRef() {
return currentClusterConfigRef;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index a811db310..137d5b7f1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
- *
* CommonPropertiesHolder
*/
public class CommonPropertiesHolder {
@@ -38,7 +37,7 @@ public class CommonPropertiesHolder {
public static final Logger LOG = LoggerFactory.getLogger(CommonPropertiesHolder.class);
public static final String KEY_COMMON_PROPERTIES = "common-properties-loader";
public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
- public static final String KEY_CLUSTER_ID = "proxy.cluster.name";
+ public static final String KEY_PROXY_CLUSTER_NAME = "proxy.cluster.name";
public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
public static final boolean DEFAULT_RESPONSE_AFTER_SAVE = false;
public static final String KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
@@ -85,7 +84,7 @@ public class CommonPropertiesHolder {
/**
* get props
- *
+ *
* @return the props
*/
public static Map<String, String> get() {
@@ -98,10 +97,10 @@ public class CommonPropertiesHolder {
/**
* Gets value mapped to key, returning defaultValue if unmapped.
- *
- * @param key to be found
- * @param defaultValue returned if key is unmapped
- * @return value associated with key
+ *
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
*/
public static String getString(String key, String defaultValue) {
return get().getOrDefault(key, defaultValue);
@@ -109,9 +108,9 @@ public class CommonPropertiesHolder {
/**
* Gets value mapped to key, returning null if unmapped.
- *
- * @param key to be found
- * @return value associated with key or null if unmapped
+ *
+ * @param key to be found
+ * @return value associated with key or null if unmapped
*/
public static String getString(String key) {
return get().get(key);
@@ -119,10 +118,10 @@ public class CommonPropertiesHolder {
/**
* getStringFromContext
- *
- * @param context
- * @param key
- * @param defaultValue
+ *
+ * @param context
+ * @param key
+ * @param defaultValue
* @return
*/
public static String getStringFromContext(Context context, String key, String defaultValue) {
@@ -133,10 +132,10 @@ public class CommonPropertiesHolder {
/**
* Gets value mapped to key, returning defaultValue if unmapped.
- *
- * @param key to be found
- * @param defaultValue returned if key is unmapped
- * @return value associated with key
+ *
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
*/
public static Integer getInteger(String key, Integer defaultValue) {
String value = get().get(key);
@@ -153,9 +152,9 @@ public class CommonPropertiesHolder {
* mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
* return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
* </p>
- *
- * @param key to be found
- * @return value associated with key or null if unmapped
+ *
+ * @param key to be found
+ * @return value associated with key or null if unmapped
*/
public static Integer getInteger(String key) {
return getInteger(key, null);
@@ -163,10 +162,10 @@ public class CommonPropertiesHolder {
/**
* Gets value mapped to key, returning defaultValue if unmapped.
- *
- * @param key to be found
- * @param defaultValue returned if key is unmapped
- * @return value associated with key
+ *
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
*/
public static Long getLong(String key, Long defaultValue) {
String value = get().get(key);
@@ -183,9 +182,9 @@ public class CommonPropertiesHolder {
* mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
* return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
* </p>
- *
- * @param key to be found
- * @return value associated with key or null if unmapped
+ *
+ * @param key to be found
+ * @return value associated with key or null if unmapped
*/
public static Long getLong(String key) {
return getLong(key, null);
@@ -193,7 +192,7 @@ public class CommonPropertiesHolder {
/**
* getAuditFormatInterval
- *
+ *
* @return
*/
public static long getAuditFormatInterval() {
@@ -202,6 +201,7 @@ public class CommonPropertiesHolder {
/**
* isResponseAfterSave
+ *
* @return
*/
public static boolean isResponseAfterSave() {
@@ -210,6 +210,7 @@ public class CommonPropertiesHolder {
/**
* get maxResponseTimeout
+ *
* @return the maxResponseTimeout
*/
public static long getMaxResponseTimeout() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/ThirdPartyClusterConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
similarity index 75%
rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/ThirdPartyClusterConfigHolder.java
rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
index c15ca189c..e2cd3c8c1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/ThirdPartyClusterConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MQClusterConfigHolder.java
@@ -18,27 +18,23 @@
package org.apache.inlong.dataproxy.config.holder;
import com.google.common.base.Splitter;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * value is map
+ * Holder of the MQ cluster config.
*/
-public class ThirdPartyClusterConfigHolder extends PropertiesConfigHolder {
+public class MQClusterConfigHolder extends PropertiesConfigHolder {
- private static final Logger LOG = LoggerFactory.getLogger(ThirdPartyClusterConfigHolder.class);
- private final ThirdPartyClusterConfig clusterConfig = new ThirdPartyClusterConfig();
+ public static final String URL_STORE_PREFIX = "mq_cluster.index";
+ private final MQClusterConfig clusterConfig = new MQClusterConfig();
- public static final String URL_STORE_PREFIX = "third-party-cluster.index";
-
- public ThirdPartyClusterConfigHolder(String fileName) {
+ public MQClusterConfigHolder(String fileName) {
super(fileName);
}
@@ -49,7 +45,6 @@ public class ThirdPartyClusterConfigHolder extends PropertiesConfigHolder {
public void loadFromFileToHolder() {
super.loadFromFileToHolder();
Map<String, String> tmpUrl2token = new HashMap<>();
- Map<String, String> tmpConfig = new HashMap<>();
for (Map.Entry<String, String> entry : getHolder().entrySet()) {
if (entry.getKey().startsWith(URL_STORE_PREFIX)) {
List<String> kv = Splitter.on(AttributeConstants.KEY_VALUE_SEPARATOR)
@@ -64,15 +59,15 @@ public class ThirdPartyClusterConfigHolder extends PropertiesConfigHolder {
clusterConfig.putAll(getHolder());
}
- public void setUrl2token(Map<String, String> newUrl2Token) {
- clusterConfig.setUrl2token(newUrl2Token);
- }
-
public Map<String, String> getUrl2token() {
return clusterConfig.getUrl2token();
}
- public ThirdPartyClusterConfig getClusterConfig() {
+ public void setUrl2token(Map<String, String> newUrl2Token) {
+ clusterConfig.setUrl2token(newUrl2Token);
+ }
+
+ public MQClusterConfig getClusterConfig() {
return clusterConfig;
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
index 4a193a3ac..c332de8fa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/PropertiesConfigHolder.java
@@ -141,7 +141,7 @@ public class PropertiesConfigHolder extends ConfigHolder {
try {
inStream.close();
} catch (IOException e) {
- LOG.error("fail to close input stream at loadTopics from {}, err {}", fileName, e);
+ LOG.error("fail to loadTopics in inStream.close for file: {}", fileName, e);
}
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
index d029442d3..2e2c7c63a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,51 +17,43 @@
package org.apache.inlong.dataproxy.config.loader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
+import java.net.URL;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
- *
- * FileCommonPropertiesLoader
+ * Class resource common properties loader
*/
public class ClassResourceCommonPropertiesLoader implements CommonPropertiesLoader {
- public static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+ private static final String FILE_NAME = "common.properties";
/**
- * load
- *
- * @return
+ * load properties
*/
@Override
public Map<String, String> load() {
- return this.loadProperties("common.properties");
+ return this.loadProperties();
}
- /**
- * loadProperties
- *
- * @param fileName
- * @return
- */
- protected Map<String, String> loadProperties(String fileName) {
+ protected Map<String, String> loadProperties() {
Map<String, String> result = new ConcurrentHashMap<>();
- try (InputStream inStream = getClass().getClassLoader().getResource(fileName).openStream()) {
+ URL resource = getClass().getClassLoader().getResource(FILE_NAME);
+ try (InputStream inStream = Objects.requireNonNull(resource).openStream()) {
Properties props = new Properties();
props.load(inStream);
for (Map.Entry<Object, Object> entry : props.entrySet()) {
result.put((String) entry.getKey(), (String) entry.getValue());
}
- } catch (UnsupportedEncodingException e) {
- LOG.error("fail to load properties, file ={}, and e= {}", fileName, e);
} catch (Exception e) {
- LOG.error("fail to load properties, file ={}, and e= {}", fileName, e);
+ LOG.error("fail to load properties from file ={}", FILE_NAME, e);
}
return result;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
index fcf839ddf..92d8f7061 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -20,15 +20,15 @@ package org.apache.inlong.dataproxy.config.loader;
import java.util.Map;
/**
- *
- * CommonPropertiesLoader
+ * Interface of common properties loader
*/
public interface CommonPropertiesLoader {
/**
* load
- *
- * @return
+ *
+ * @return the configuration map
*/
Map<String, String> load();
+
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/ThirdPartyClusterConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
similarity index 84%
rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/ThirdPartyClusterConfig.java
rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
index 27639e663..5a941096b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/ThirdPartyClusterConfig.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java
@@ -24,112 +24,124 @@ import org.apache.pulsar.shade.io.netty.util.internal.SystemPropertyUtil;
import java.util.HashMap;
import java.util.Map;
-public class ThirdPartyClusterConfig extends Context {
+public class MQClusterConfig extends Context {
- private Map<String, String> url2token = new HashMap<>();
-
- /*
- * properties key for pulsar client
- */
- public static final String MQ_SERVER_URL_LIST = "mq_server_url_list";
- /*
- * properties key pulsar producer
- */
- public static final String TOKEN = "mq_token";
- public static final String PULSAR_AUTH_TYPE = "pulsar_auth_type";
+ private static final String SEND_REMOTE = "send_remote";
+ private static final boolean DEFAULT_SEND_REMOTE = false;
private static final String SEND_TIMEOUT = "send_timeout_mill";
+ private static final int DEFAULT_SEND_TIMEOUT_MILL = 30 * 1000;
+
+ private static final String CLIENT_ID_CACHE = "client_id_cache";
+ private static final boolean DEFAULT_CLIENT_ID_CACHE = true;
private static final String CLIENT_TIMEOUT = "client_timeout_second";
+ private static final int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
+
private static final String ENABLE_BATCH = "enable_batch";
+ private static final boolean DEFAULT_ENABLE_BATCH = true;
+
private static final String BLOCK_IF_QUEUE_FULL = "block_if_queue_full";
+ private static final boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
+
private static final String MAX_PENDING_MESSAGES = "max_pending_messages";
- private static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS =
- "max_pending_messages_across_partitions";
+ private static final int DEFAULT_MAX_PENDING_MESSAGES = 10000;
+
+ private static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "max_pending_messages_across_partitions";
+ private static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 500000;
+
private static final String COMPRESSION_TYPE = "compression_type";
+ private static final String DEFAULT_COMPRESSION_TYPE = "NONE";
+
private static final String MAX_BATCHING_MESSAGES = "max_batching_messages";
- private static final String RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = "retry_interval_when_send_error_ms";
- private static final String SINK_THREAD_NUM = "thread_num";
- private static final String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
- private static final String PULSAR_IO_THREADS = "pulsar_io_threads";
- private static final String PULSAR_CONNECTIONS_PRE_BROKER = "connections_pre_broker";
+ private static final int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
+
private static final String MAX_BATCHING_BYTES = "max_batching_bytes";
- private static final String MAX_BATCHING_PUBLISH_DELAY_MILLIS =
- "max_batching_publish_delay_millis";
- private static final String EVENT_QUEUE_SIZE = "event_queue_size";
- private static final String BAD_EVENT_QUEUE_SIZE = "bad_event_queue_size";
- private static final String MAX_RETRY_SEND_TIMES = "max_retry_send_times";
+ private static final int DEFAULT_MAX_BATCHING_BYTES = 128 * 1024;
- private static final String SLA_METRIC_SINK = "sla_metric_sink";
+ private static final String MAX_BATCHING_PUBLISH_DELAY_MILLIS = "max_batching_publish_delay_millis";
+ private static final long DEFAULT_MAX_BATCHING_PUBLISH_DELAY_MILLIS = 1L;
+ private static final String RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = "retry_interval_when_send_error_ms";
+ private static final long DEFAULT_RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = 30 * 1000L;
- // log params
- private static final String LOG_TOPIC = "proxy_log_topic";
- private static final String LOG_STREAMID = "proxy_log_streamid";
- private static final String LOG_GROUPID = "proxy_log_groupid";
- private static final String SEND_REMOTE = "send_remote";
+ private static final String SLA_METRIC_SINK = "sla_metric_sink";
+ private static final boolean DEFAULT_SLA_METRIC_SINK = false;
- // tubemq params
+ // TubeMQ params
private static final String MAX_SURVIVED_TIME = "max_survived_time";
+ private static final int DEFAULT_MAX_SURVIVED_TIME = 300000;
+
private static final String MAX_SURVIVED_SIZE = "max_survived_size";
+ private static final int DEFAULT_MAX_SURVIVED_SIZE = 3000000;
+
private static final String NEW_CHECK_PATTERN = "new_check_pattern";
+ private static final boolean DEFAULT_NEW_CHECK_PATTERN = true;
+
private static final String OLD_METRIC_ON = "old_metric_on";
+ private static final boolean DEFAULT_OLD_METRIC_ON = true;
+
private static final String SET_VALUE = "set";
+ private static final int DEFAULT_SET_VALUE = 10;
+
private static final String MAX_TOPICS_EACH_PRODUCER_HOLD = "max_topic_each_producer_hold";
+ private static final int DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
+
private static final String TUBE_REQUEST_TIMEOUT = "tube_request_timeout";
- public static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count";
- public static final String SESSION_WARN_DELAYED_MSG_COUNT = "session_warn_delayed_msg_count";
- public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session_max_allowed_delayed_msg_count";
- public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
- public static final String RECOVER_THREAD_COUNT = "recover_thread_count";
+ private static final int DEFAULT_TUBE_REQUEST_TIMEOUT = 60;
+
+ private static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count";
+ private static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 80000L;
+
+ private static final String SESSION_WARN_DELAYED_MSG_COUNT = "session_warn_delayed_msg_count";
+ private static final long DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT = 2000000L;
+
+ private static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session_max_allowed_delayed_msg_count";
+ private static final long DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 4000000L;
+
+ private static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
+ private static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L;
+
+ private static final String RECOVER_THREAD_COUNT = "recover_thread_count";
+ private static final int DEFAULT_RECOVER_THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1;
+
+ // log params
+ private static final String LOG_TOPIC = "proxy_log_topic";
+ private static final String DEFAULT_LOG_TOPIC = "manager";
+ private static final String LOG_GROUP_ID = "proxy_log_groupid";
+ private static final String DEFAULT_LOG_GROUP_ID = "proxy_measure_log";
+ private static final String LOG_STREAM_ID = "proxy_log_streamid";
+ private static final String DEFAULT_LOG_STREAM_ID = "manager";
+ private static final String LOG_EVERY_N_EVENTS = "log_every_n_events";
+ private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
/*
- * properties for stat
+ * stat params
*/
private static final String STAT_INTERVAL_SEC = "stat_interval_sec";
- private static final String LOG_EVERY_N_EVENTS = "log_every_n_events";
- private static final String CLIENT_ID_CACHE = "client_id_cache";
-
- public static String PULSAR_DEFAULT_AUTH_TYPE = "token";
- private static final int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
- private static final int DEFAULT_SEND_TIMEOUT_MILL = 30 * 1000;
- private static final long DEFAULT_RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = 30 * 1000L;
- private static final boolean DEFAULT_ENABLE_BATCH = true;
- private static final boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
- private static final int DEFAULT_MAX_PENDING_MESSAGES = 10000;
- private static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 500000;
- private static final String DEFAULT_COMPRESSION_TYPE = "NONE";
- private static final int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
- private static final int DEFAULT_MAX_BATCHING_BYTES = 128 * 1024;
- private static final long DEFAULT_MAX_BATCHING_PUBLISH_DELAY_MILLIS = 1L;
- private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
private static final int DEFAULT_STAT_INTERVAL_SEC = 60;
+
+ /*
+ * Pulsar params
+ */
+ private static final String SINK_THREAD_NUM = "thread_num";
private static final int DEFAULT_THREAD_NUM = 4;
- private static final boolean DEFAULT_CLIENT_ID_CACHE = true;
+ private static final String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
private static final long DEFAULT_DISK_IO_RATE_PER_SEC = 0L;
- private static final int DEFAULT_EVENT_QUEUE_SIZE = 10000;
- private static final int DEFAULT_BAD_EVENT_QUEUE_SIZE = 10000;
+ private static final String PULSAR_IO_THREADS = "pulsar_io_threads";
private static final int DEFAULT_PULSAR_IO_THREADS = Math.max(1, SystemPropertyUtil
.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
+ private static final String PULSAR_CONNECTIONS_PRE_BROKER = "connections_pre_broker";
private static final int DEFAULT_CONNECTIONS_PRE_BROKER = 1;
+ private static final String EVENT_QUEUE_SIZE = "event_queue_size";
+ private static final int DEFAULT_EVENT_QUEUE_SIZE = 10000;
+ private static final String BAD_EVENT_QUEUE_SIZE = "bad_event_queue_size";
+ private static final int DEFAULT_BAD_EVENT_QUEUE_SIZE = 10000;
+ private static final String MAX_RETRY_SEND_TIMES = "max_retry_send_times";
private static final int DEFAULT_MAX_RETRY_SEND_TIMES = 16;
- private static final boolean DEFAULT_SLA_METRIC_SINK = false;
- private static final String DEFAULT_LOG_TOPIC = "teg_manager";
- private static final String DEFAULT_LOG_STREAMID = "b_teg_manager";
- private static final String DEFAULT_LOG_GROUPID = "proxy_measure_log";
- private static final boolean DEFAULT_SEND_REMOTE = false;
+ public static String PULSAR_AUTH_TYPE = "pulsar_auth_type";
+ public static String PULSAR_DEFAULT_AUTH_TYPE = "token";
- private static final int DEFAULT_MAX_SURVIVED_TIME = 300000;
- private static final int DEFAULT_MAX_SURVIVED_SIZE = 3000000;
- private static final boolean DEFAULT_NEW_CHECK_PATTERN = true;
- private static final boolean DEFAULT_OLD_METRIC_ON = true;
- private static final int DEFAULT_SET_VALUE = 10;
- private static final int DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
- private static final int DEFAULT_TUBE_REQUEST_TIMEOUT = 60;
- public static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 80000L;
- public static final long DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT = 2000000L;
- public static final long DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 4000000L;
- public static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L;
- public static int DEFAULT_RECOVER_THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1;
+ private Map<String, String> url2token = new HashMap<>();
public long getLinkMaxAllowedDelayedMsgCount() {
return getLong(LINK_MAX_ALLOWED_DELAYED_MSG_COUNT, DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT);
@@ -281,11 +293,11 @@ public class ThirdPartyClusterConfig extends Context {
}
public String getLogStreamId() {
- return getString(LOG_STREAMID, DEFAULT_LOG_STREAMID);
+ return getString(LOG_STREAM_ID, DEFAULT_LOG_STREAM_ID);
}
public String getLogGroupId() {
- return getString(LOG_GROUPID, DEFAULT_LOG_GROUPID);
+ return getString(LOG_GROUP_ID, DEFAULT_LOG_GROUP_ID);
}
public boolean getEnableSendRemote() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
index c4cbbd5d2..657480d78 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
@@ -17,73 +17,76 @@
package org.apache.inlong.dataproxy.consts;
-public interface AttributeConstants {
-
- String SEPARATOR = "&";
- String KEY_VALUE_SEPARATOR = "=";
-
- String REQUEST_TYPE = "requestType";
+/**
+ * Attribute constants
+ */
+public class AttributeConstants {
- String OPERATION_TYPE = "operationType";
+ public static final String SEPARATOR = "&";
+ public static final String KEY_VALUE_SEPARATOR = "=";
- String OPERATION_CONTENT = "content";
+ public static final String BODY = "body";
+ public static final String CHARSET = "UTF-8";
+ public static final String HTTP_REQUEST = "http-request";
+ public static final String HTTP_RESPONSE = "http-response";
/**
* group id unique string id for each business or product
*/
- String GROUP_ID = "groupId";
+ public static final String GROUP_ID = "groupId";
/**
* interface id unique string id for each interface of business An interface stand for a kind of
* data
*/
- String STREAM_ID = "streamId";
+ public static final String STREAM_ID = "streamId";
/**
* iname is like a streamId but used in file protocol(m=xxx)
*/
- String INAME = "iname";
+ public static final String INAME = "iname";
/**
* data time
*/
- String DATA_TIME = "dt";
+ public static final String DATA_TIME = "dt";
- String TIME_STAMP = "t";
+ public static final String TIME_STAMP = "t";
/* compress type */
- String COMPRESS_TYPE = "cp";
+ public static final String COMPRESS_TYPE = "cp";
/* count value for how many records a message body contains */
- String MESSAGE_COUNT = "cnt";
+ public static final String MESSAGE_COUNT = "cnt";
/* message type */
- String MESSAGE_TYPE = "mt";
+ public static final String MESSAGE_TYPE = "mt";
/* sort type */
- String METHOD = "m";
+ public static final String METHOD = "m";
/* global unique id for a message*/
- String SEQUENCE_ID = "sid";
+ public static final String SEQUENCE_ID = "sid";
- String UNIQ_ID = "uniq";
+ public static final String UNIQ_ID = "uniq";
/* from where */
- String FROM = "f";
+ public static final String FROM = "f";
+
+ public static final String RCV_TIME = "rt";
- String RCV_TIME = "rt";
+ public static final String NODE_IP = "NodeIP";
- String NODE_IP = "NodeIP";
+ public static final String NUM2NAME = "num2name";
- String NUM2NAME = "num2name";
+ public static final String GROUPID_NUM = "groupIdnum";
- String GROUPID_NUM = "groupIdnum";
+ public static final String STREAMID_NUM = "streamIdnum";
- String STREAMID_NUM = "streamIdnum";
+ public static final String MESSAGE_PARTITION_KEY = "partitionKey";
- String MESSAGE_PARTITION_KEY = "partitionKey";
+ public static final String MESSAGE_SYNC_SEND = "syncSend";
- String MESSAGE_SYNC_SEND = "syncSend";
+ public static final String MESSAGE_IS_ACK = "isAck";
- String MESSAGE_IS_ACK = "isAck";
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index e3b6d8ea7..3e86620eb 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -118,4 +118,8 @@ public class ConfigConstants {
public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
public static final String RECOVER_THREAD_COUNT = "recover_thread_count";
+ public static final String MANAGER_PATH = "/api/inlong/manager/openapi";
+ public static final String MANAGER_GET_CONFIG_PATH = "/dataproxy/getConfig";
+ public static final String MANAGER_GET_ALL_CONFIG_PATH = "/dataproxy/getAllConfig";
+
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/Context.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/Context.java
index 87c643f6e..c40ff0553 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/Context.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/Context.java
@@ -31,7 +31,7 @@ public interface Context {
void clear();
- void destory();
+ void destroy();
boolean containsKey(String key);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
index 1535e172b..053209b3b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
@@ -18,30 +18,29 @@
package org.apache.inlong.dataproxy.http;
import com.google.common.base.Preconditions;
-import org.apache.inlong.common.monitor.CounterGroup;
-import org.apache.inlong.common.monitor.CounterGroupExt;
-import org.apache.inlong.common.monitor.StatConstants;
-import org.apache.inlong.common.monitor.StatRunner;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
-import org.apache.inlong.dataproxy.utils.ConfStringUtils;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.common.monitor.CounterGroup;
+import org.apache.inlong.common.monitor.CounterGroupExt;
+import org.apache.inlong.common.monitor.StatConstants;
+import org.apache.inlong.common.monitor.StatRunner;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.utils.ConfStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HttpBaseSource
- extends AbstractSource
- implements EventDrivenSource, Configurable {
+import java.util.HashSet;
+import java.util.Set;
- private static final Logger logger = LoggerFactory.getLogger(HttpBaseSource.class);
+public class HttpBaseSource extends AbstractSource implements EventDrivenSource, Configurable {
+ private static final Logger logger = LoggerFactory.getLogger(HttpBaseSource.class);
+ private static final String CONNECTIONS = "connections";
protected int port;
protected String host = null;
protected int maxMsgLength;
@@ -49,18 +48,14 @@ public class HttpBaseSource
protected String attr;
protected String messageHandlerName;
protected boolean filterEmptyMsg;
- private int statIntervalSec;
-
protected CounterGroup counterGroup;
protected CounterGroupExt counterGroupExt;
-
- private StatRunner statRunner;
- private Thread statThread;
protected int maxConnections = Integer.MAX_VALUE;
- private static final String CONNECTIONS = "connections";
-
protected boolean customProcessor = false;
protected Context context;
+ private int statIntervalSec;
+ private StatRunner statRunner;
+ private Thread statThread;
public HttpBaseSource() {
super();
@@ -71,14 +66,13 @@ public class HttpBaseSource
@Override
public synchronized void start() {
if (statIntervalSec > 0) {
- Set<String> moniterNames = new HashSet<String>();
- moniterNames.add(StatConstants.EVENT_SUCCESS);
- moniterNames.add(StatConstants.EVENT_DROPPED);
- moniterNames.add(StatConstants.EVENT_EMPTY);
- moniterNames.add(StatConstants.EVENT_OTHEREXP);
- moniterNames.add(StatConstants.EVENT_INVALID);
- statRunner = new StatRunner(getName(), counterGroup, counterGroupExt, statIntervalSec,
- moniterNames);
+ Set<String> monitorNames = new HashSet<>();
+ monitorNames.add(StatConstants.EVENT_SUCCESS);
+ monitorNames.add(StatConstants.EVENT_DROPPED);
+ monitorNames.add(StatConstants.EVENT_EMPTY);
+ monitorNames.add(StatConstants.EVENT_OTHEREXP);
+ monitorNames.add(StatConstants.EVENT_INVALID);
+ statRunner = new StatRunner(getName(), counterGroup, counterGroupExt, statIntervalSec, monitorNames);
statThread = new Thread(statRunner);
statThread.setName("Thread-Stat-" + this.getName());
statThread.start();
@@ -107,7 +101,7 @@ public class HttpBaseSource
}
} catch (InterruptedException e) {
- logger.warn("statrunner interrupted");
+ logger.warn("start runner interrupted");
}
}
@@ -116,7 +110,6 @@ public class HttpBaseSource
/**
* configure
- * @param context
*/
public void configure(Context context) {
this.context = context;
@@ -157,7 +150,7 @@ public class HttpBaseSource
try {
maxConnections = context.getInteger(CONNECTIONS, 5000);
} catch (NumberFormatException e) {
- logger.warn("BaseSource\'s \"connections\" property must specify an integer value. {}",
+ logger.warn("BaseSource connections property must specify an integer value {}",
context.getString(CONNECTIONS));
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpSourceConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpSourceConstants.java
deleted file mode 100644
index 49b06b436..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpSourceConstants.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.http;
-
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
-
-public interface HttpSourceConstants
- extends AttributeConstants {
-
- String BODY = "body";
- String CHARSET = "UTF-8";
-
- String HTTP_REQUEST = "http-request";
- String HTTP_RESPONSE = "http-response";
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MappedContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MappedContext.java
index 497848670..550208de5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MappedContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MappedContext.java
@@ -22,8 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-public class MappedContext
- implements Context {
+public class MappedContext implements Context {
private Map<String, Object> mapContext;
@@ -33,7 +32,7 @@ public class MappedContext
@Override
public void init() {
- mapContext = new HashMap();
+ mapContext = new HashMap<>();
}
@Override
@@ -57,7 +56,7 @@ public class MappedContext
}
@Override
- public void destory() {
+ public void destroy() {
mapContext = Collections.emptyMap();
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
index 0b3426019..af903b482 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
@@ -17,12 +17,12 @@
package org.apache.inlong.dataproxy.http;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.GROUP_ID;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.DATA_TIME;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.STREAM_ID;
-import static org.apache.inlong.dataproxy.http.HttpSourceConstants.BODY;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.ChannelException;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@@ -31,19 +31,11 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.commons.lang.StringUtils;
-import org.apache.flume.ChannelException;
-import org.apache.inlong.common.monitor.LogCounter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
-public class MessageFilter
- implements Filter {
+public class MessageFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(MessageFilter.class);
- private static final LogCounter logCounter = new LogCounter(10,
- 100000, 60 * 1000);
private final int maxMsgLength;
@@ -56,8 +48,7 @@ public class MessageFilter
}
@Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
- throws IOException, ServletException {
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException {
HttpServletRequest req = (HttpServletRequest) request;
HttpServletResponse resp = (HttpServletResponse) response;
@@ -76,10 +67,10 @@ public class MessageFilter
}
String invalidKey = null;
- String groupId = req.getParameter(GROUP_ID);
- String streamId = req.getParameter(STREAM_ID);
- String dt = req.getParameter(DATA_TIME);
- String body = req.getParameter(BODY);
+ String groupId = req.getParameter(AttributeConstants.GROUP_ID);
+ String streamId = req.getParameter(AttributeConstants.STREAM_ID);
+ String dt = req.getParameter(AttributeConstants.DATA_TIME);
+ String body = req.getParameter(AttributeConstants.BODY);
if (StringUtils.isEmpty(groupId)) {
invalidKey = "groupId";
@@ -92,7 +83,6 @@ public class MessageFilter
}
try {
-
if (invalidKey != null) {
LOG.warn("Received bad request from client. " + invalidKey + " is empty.");
code = StatusCode.ILLEGAL_ARGUMENT;
@@ -129,7 +119,7 @@ public class MessageFilter
private String getResultContent(int code, String message, String callback) {
StringBuilder builder = new StringBuilder();
if (StringUtils.isNotEmpty(callback)) {
- builder.append(callback + "(");
+ builder.append(callback).append("(");
}
builder.append("{\"code\":\"");
builder.append(code);
@@ -142,4 +132,5 @@ public class MessageFilter
return builder.toString();
}
+
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageHandler.java
index 4804fbabc..ee3d7aa9f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageHandler.java
@@ -20,8 +20,7 @@ package org.apache.inlong.dataproxy.http;
import org.apache.flume.conf.Configurable;
import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
-public interface MessageHandler
- extends Configurable {
+public interface MessageHandler extends Configurable {
void init();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
index 3e63d9497..7fe8757c4 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
@@ -17,27 +17,19 @@
package org.apache.inlong.dataproxy.http;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.GROUP_ID;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.DATA_TIME;
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.STREAM_ID;
-import static org.apache.inlong.dataproxy.http.HttpSourceConstants.BODY;
-import static org.apache.inlong.dataproxy.http.HttpSourceConstants.HTTP_REQUEST;
-import static org.apache.inlong.dataproxy.http.HttpSourceConstants.HTTP_RESPONSE;
-
import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MessageProcessServlet
- extends HttpServlet {
+public class MessageProcessServlet extends HttpServlet {
private static final Logger LOG = LoggerFactory.getLogger(MessageProcessServlet.class);
private static final LogCounter logCounter = new LogCounter(10, 100000, 60 * 1000);
@@ -55,18 +47,16 @@ public class MessageProcessServlet
}
@Override
- protected void doPost(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp) {
try {
Context context = new MappedContext();
+ context.put(AttributeConstants.GROUP_ID, req.getParameter(AttributeConstants.GROUP_ID));
+ context.put(AttributeConstants.STREAM_ID, req.getParameter(AttributeConstants.STREAM_ID));
+ context.put(AttributeConstants.DATA_TIME, req.getParameter(AttributeConstants.DATA_TIME));
+ context.put(AttributeConstants.BODY, req.getParameter(AttributeConstants.BODY));
- context.put(GROUP_ID, req.getParameter(GROUP_ID));
- context.put(STREAM_ID, req.getParameter(STREAM_ID));
- context.put(DATA_TIME, req.getParameter(DATA_TIME));
- context.put(BODY, req.getParameter(BODY));
-
- context.put(HTTP_REQUEST, req);
- context.put(HTTP_RESPONSE, resp);
+ context.put(AttributeConstants.HTTP_REQUEST, req);
+ context.put(AttributeConstants.HTTP_RESPONSE, resp);
messageHandler.processMessage(context);
} catch (MessageProcessException e) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleHttpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleHttpSource.java
index b5cc6b4e3..40c9c8d41 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleHttpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleHttpSource.java
@@ -46,8 +46,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SimpleHttpSource
- extends HttpBaseSource {
+public class SimpleHttpSource extends HttpBaseSource {
private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 9dbeec9ee..79ecfa03a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -17,21 +17,11 @@
package org.apache.inlong.dataproxy.http;
-import com.google.common.base.Splitter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URL;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
-import org.apache.flume.Event;
import org.apache.inlong.common.monitor.CounterGroup;
import org.apache.inlong.common.monitor.CounterGroupExt;
import org.apache.inlong.common.monitor.MonitorIndex;
@@ -42,43 +32,44 @@ import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.source.ServiceDecoder;
import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
-
+import org.apache.inlong.dataproxy.source.ServiceDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SimpleMessageHandler
- implements MessageHandler {
+import javax.servlet.http.HttpServletRequest;
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SimpleMessageHandler implements MessageHandler {
private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageHandler.class);
private static final ConfigManager configManager = ConfigManager.getInstance();
-
+ private static final String SEPARATOR = "#";
+ private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER =
+ new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ return new SimpleDateFormat("yyyyMMddHHmm");
+ }
+ };
+ private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
private final CounterGroup counterGroup;
private final CounterGroupExt counterGroupExt;
-
- private static final String SEPARATOR = "#";
- private final boolean isNewMetricOn = true;
private final MonitorIndex monitorIndex = new MonitorIndex("Source", 60, 300000);
private final MonitorIndexExt monitorIndexExt =
new MonitorIndexExt("DataProxy_monitors#http", 60, 100000);
+ private final ChannelProcessor processor;
+
+ private final boolean isNewMetricOn = true;
@SuppressWarnings("unused")
private int maxMsgLength;
private long logCounter = 0L;
private long channelTrace = 0L;
- private static final ThreadLocal<SimpleDateFormat> dateFormator =
- new ThreadLocal<SimpleDateFormat>() {
- @Override
- protected SimpleDateFormat initialValue() {
- return new SimpleDateFormat("yyyyMMddHHmm");
- }
- };
-
- private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
- private final ChannelProcessor processor;
-
public SimpleMessageHandler(ChannelProcessor processor, CounterGroup counterGroup,
CounterGroupExt counterGroupExt, ServiceDecoder decoder) {
this.processor = processor;
@@ -98,14 +89,13 @@ public class SimpleMessageHandler
@Override
public void processMessage(Context context) throws MessageProcessException {
- String topic = "test";
- String topicValue = topic;
+ String topicValue = "test";
String attr = "m=0";
- StringBuffer newAttrBuffer = new StringBuffer(attr);
+ StringBuilder newAttrBuffer = new StringBuilder(attr);
- String groupId = (String) context.get(HttpSourceConstants.GROUP_ID);
- String streamId = (String) context.get(HttpSourceConstants.STREAM_ID);
- String dt = (String) context.get(HttpSourceConstants.DATA_TIME);
+ String groupId = (String) context.get(AttributeConstants.GROUP_ID);
+ String streamId = (String) context.get(AttributeConstants.STREAM_ID);
+ String dt = (String) context.get(AttributeConstants.DATA_TIME);
String value = getTopic(groupId, streamId);
if (null != value && !"".equals(value)) {
@@ -114,34 +104,34 @@ public class SimpleMessageHandler
String mxValue = configManager.getMxProperties().get(groupId);
if (null != mxValue) {
- newAttrBuffer = new StringBuffer(mxValue.trim());
+ newAttrBuffer = new StringBuilder(mxValue.trim());
}
newAttrBuffer.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
.append("&dt=").append(dt);
HttpServletRequest request =
- (HttpServletRequest) context.get(HttpSourceConstants.HTTP_REQUEST);
+ (HttpServletRequest) context.get(AttributeConstants.HTTP_REQUEST);
String strRemoteIP = request.getRemoteAddr();
newAttrBuffer.append("&NodeIP=").append(strRemoteIP);
- String msgCount = request.getParameter(HttpSourceConstants.MESSAGE_COUNT);
+ String msgCount = request.getParameter(AttributeConstants.MESSAGE_COUNT);
if (msgCount == null || "".equals(msgCount)) {
msgCount = "1";
}
InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
- String charset = (String) context.get(HttpSourceConstants.CHARSET);
+ String charset = (String) context.get(AttributeConstants.CHARSET);
if (charset == null || "".equals(charset)) {
charset = "UTF-8";
}
- String body = (String) context.get(HttpSourceConstants.BODY);
+ String body = (String) context.get(AttributeConstants.BODY);
try {
inLongMsg.addMsg(newAttrBuffer.toString(), body.getBytes(charset));
} catch (UnsupportedEncodingException e) {
throw new MessageProcessException(e);
}
- Map<String, String> headers = new HashMap<String, String>();
- headers.put(HttpSourceConstants.DATA_TIME, dt);
+ Map<String, String> headers = new HashMap<>();
+ headers.put(AttributeConstants.DATA_TIME, dt);
headers.put(ConfigConstants.TOPIC_KEY, topicValue);
headers.put(AttributeConstants.STREAM_ID, streamId);
headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
@@ -149,14 +139,14 @@ public class SimpleMessageHandler
headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount);
byte[] data = inLongMsg.buildArray();
headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
- String pkgTime = dateFormator.get().format(inLongMsg.getCreatetime());
+ String pkgTime = DATE_FORMATTER.get().format(inLongMsg.getCreatetime());
headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime);
Event event = EventBuilder.withBody(data, headers);
String counterExtKey = topicValue + "#" + 0 + "#" + strRemoteIP + "#time#" + pkgTime;
counterGroupExt.addAndGet(counterExtKey, Long.valueOf(msgCount));
- long dtten = 0;
+ long dtten;
try {
dtten = Long.parseLong(dt);
} catch (NumberFormatException e1) {
@@ -167,16 +157,15 @@ public class SimpleMessageHandler
dtten = dtten / 1000 / 60 / 10;
dtten = dtten * 1000 * 60 * 10;
- StringBuilder newbase = new StringBuilder();
- newbase.append("http").append(SEPARATOR).append(topicValue).append(SEPARATOR)
+ StringBuilder newBase = new StringBuilder();
+ newBase.append("http").append(SEPARATOR).append(topicValue).append(SEPARATOR)
.append(streamId).append(SEPARATOR).append(strRemoteIP).append(SEPARATOR)
.append(NetworkUtils.getLocalIp()).append(SEPARATOR)
.append(new SimpleDateFormat("yyyyMMddHHmm").format(dtten)).append(SEPARATOR)
.append(pkgTime);
if (isNewMetricOn) {
- monitorIndex
- .addAndGet(new String(newbase), Integer.parseInt(msgCount), 1, data.length, 0);
+ monitorIndex.addAndGet(new String(newBase), Integer.parseInt(msgCount), 1, data.length, 0);
}
inLongMsg.reset();
@@ -190,29 +179,27 @@ public class SimpleMessageHandler
counterGroup.incrementAndGet(StatConstants.EVENT_DROPPED);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
if (isNewMetricOn) {
- monitorIndex.addAndGet(new String(newbase), 0, 0, 0, Integer.parseInt(msgCount));
+ monitorIndex.addAndGet(new String(newBase), 0, 0, 0, Integer.parseInt(msgCount));
}
logCounter++;
if (logCounter == 1 || logCounter % 1000 == 0) {
- LOG.error("Error writting to channel,and will retry after 1s,ex={},"
- + "logCounter = {}, spend time={} ms", new Object[]{ex.toString(), logCounter,
- System.currentTimeMillis() - beginTime});
+ LOG.error("Error writing to channel, and will retry after 1s, ex={},"
+ + "logCounter={}, spend time={} ms", ex, logCounter, System.currentTimeMillis() - beginTime);
if (logCounter > Long.MAX_VALUE - 10) {
logCounter = 0;
- LOG.info("logCounter will reverse.");
+ LOG.info("logCounter will reverse");
}
}
throw ex;
}
channelTrace++;
if (channelTrace % 600000 == 0) {
- LOG.info("processor.processEvent spend time={} ms",
- System.currentTimeMillis() - beginTime);
+ LOG.info("processor.processEvent spend time={} ms", System.currentTimeMillis() - beginTime);
}
if (channelTrace > Long.MAX_VALUE - 10) {
channelTrace = 0;
- LOG.info("channelTrace will reverse.");
+ LOG.info("channelTrace will reverse");
}
}
@@ -220,55 +207,6 @@ public class SimpleMessageHandler
public void configure(org.apache.flume.Context context) {
}
- private Map<String, String> getAttributeMap(String attrs) {
- if (null != attrs && !"".equals(attrs)) {
- try {
- Splitter.MapSplitter mySplitter =
- Splitter.on("&").trimResults().withKeyValueSeparator("=");
- return mySplitter.split(attrs);
- } catch (Exception e) {
- LOG.error("fail to fillTopicKeyandAttr,attr:{}", attrs);
- LOG.error("error!", e);
- }
- }
- return null;
- }
-
- private Map<String, String> loadProperties(String fileName) {
- HashMap<String, String> map = new HashMap<String, String>();
- if (null == fileName) {
- LOG.error("fail to loadTopics, null == fileName.");
- return map;
- }
- InputStream inStream = null;
- try {
- URL url = getClass().getClassLoader().getResource(fileName);
- inStream = url != null ? url.openStream() : null;
- if (inStream == null) {
- LOG.error("InputStream {} is null!", fileName);
- return map;
- }
- Properties props = new Properties();
- props.load(inStream);
- for (Map.Entry<Object, Object> entry : props.entrySet()) {
- map.put((String) entry.getKey(), (String) entry.getValue());
- }
- } catch (UnsupportedEncodingException e) {
- LOG.error("fail to loadPropery, file ={}, and e= {}", fileName, e);
- } catch (Exception e) {
- LOG.error("fail to loadProperty, file ={}, and e= {}", fileName, e);
- } finally {
- if (null != inStream) {
- try {
- inStream.close();
- } catch (IOException e) {
- LOG.error("fail to loadTopics, inStream.close ,and e= {}", fileName, e);
- }
- }
- }
- return map;
- }
-
private String getTopic(String groupId, String streamId) {
String topic = null;
if (StringUtils.isNotEmpty(groupId)) {
@@ -279,8 +217,7 @@ public class SimpleMessageHandler
topic = configManager.getTopicProperties().get(groupId);
}
}
- LOG.debug("Get topic by groupId/streamId = {}, topic = {}", groupId + "/" + streamId,
- topic);
+ LOG.debug("Get topic by groupId/streamId = {}, topic = {}", groupId + "/" + streamId, topic);
return topic;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
index f4cf573cf..88e411573 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
@@ -22,20 +22,20 @@ public interface StatusCode {
/*
* success
*/
- public static final int SUCCESS = 1;
+ int SUCCESS = 1;
/*
* illegal argument
*/
- public static final int ILLEGAL_ARGUMENT = -100;
+ int ILLEGAL_ARGUMENT = -100;
/*
* exceed length
*/
- public static final int EXCEED_LEN = -101;
+ int EXCEED_LEN = -101;
/*
* service error
*/
- public static final int SERVICE_ERR = -105;
+ int SERVICE_ERR = -105;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 19f6e5890..380826649 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -44,7 +44,7 @@ import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
import org.apache.inlong.dataproxy.base.OrderEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
@@ -172,7 +172,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
private Map<String, String> topicProperties;
private Map<String, String> pulsarCluster;
- private ThirdPartyClusterConfig pulsarConfig;
+ private MQClusterConfig pulsarConfig;
private static final Long PRINT_INTERVAL = 30L;
@@ -215,8 +215,8 @@ public class PulsarSink extends AbstractSink implements Configurable,
configManager = ConfigManager.getInstance();
topicProperties = configManager.getTopicProperties();
- pulsarCluster = configManager.getThirdPartyClusterUrl2Token();
- pulsarConfig = configManager.getThirdPartyClusterConfig(); //pulsar common config
+ pulsarCluster = configManager.getMqClusterUrl2Token();
+ pulsarConfig = configManager.getMqClusterConfig(); //pulsar common config
commonProperties = configManager.getCommonProperties();
sinkThreadPoolSize = pulsarConfig.getThreadNum();
if (sinkThreadPoolSize <= 0) {
@@ -250,12 +250,12 @@ public class PulsarSink extends AbstractSink implements Configurable,
}
}
});
- configManager.getThirdPartyClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
+ configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
@Override
public void update() {
if (pulsarClientService != null) {
diffUpdatePulsarClient(pulsarClientService, pulsarCluster,
- configManager.getThirdPartyClusterUrl2Token());
+ configManager.getMqClusterUrl2Token());
}
}
});
@@ -337,7 +337,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
pulsarClientService.updatePulsarClients(this, needToClose, needToStart,
new HashSet<>(topicProperties.values()));
- pulsarCluster = configManager.getThirdPartyClusterUrl2Token();
+ pulsarCluster = configManager.getMqClusterUrl2Token();
}
@Override
@@ -752,7 +752,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
return currentInFlightCount;
}
- public ThirdPartyClusterConfig getPulsarConfig() {
+ public MQClusterConfig getPulsarConfig() {
return pulsarConfig;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
index 5e4d430d3..dbf823bd8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,10 +17,6 @@
package org.apache.inlong.dataproxy.sink;
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
@@ -29,6 +25,10 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
/**
* SinkContext
*/
@@ -37,37 +37,33 @@ public class SinkContext {
public static final Logger LOG = LoggerFactory.getLogger(SinkContext.class);
public static final String KEY_MAX_THREADS = "maxThreads";
- public static final String KEY_PROCESSINTERVAL = "processInterval";
- public static final String KEY_RELOADINTERVAL = "reloadInterval";
+ public static final String KEY_PROCESS_INTERVAL = "processInterval";
+ public static final String KEY_RELOAD_INTERVAL = "reloadInterval";
protected final String clusterId;
protected final String sinkName;
protected final Context sinkContext;
protected final Channel channel;
- //
+
protected final int maxThreads;
protected final long processInterval;
protected final long reloadInterval;
- //
+
protected final DataProxyMetricItemSet metricItemSet;
protected Timer reloadTimer;
/**
* Constructor
- *
- * @param sinkName
- * @param context
- * @param channel
*/
public SinkContext(String sinkName, Context context, Channel channel) {
this.sinkName = sinkName;
this.sinkContext = context;
this.channel = channel;
- this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+ this.clusterId = context.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
- this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100);
- this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
+ this.processInterval = sinkContext.getInteger(KEY_PROCESS_INTERVAL, 100);
+ this.reloadInterval = sinkContext.getLong(KEY_RELOAD_INTERVAL, 60000L);
//
this.metricItemSet = new DataProxyMetricItemSet(sinkName);
MetricRegister.register(this.metricItemSet);
@@ -118,7 +114,7 @@ public class SinkContext {
/**
* get clusterId
- *
+ *
* @return the clusterId
*/
public String getClusterId() {
@@ -127,7 +123,7 @@ public class SinkContext {
/**
* get sinkName
- *
+ *
* @return the sinkName
*/
public String getSinkName() {
@@ -136,7 +132,7 @@ public class SinkContext {
/**
* get sinkContext
- *
+ *
* @return the sinkContext
*/
public Context getSinkContext() {
@@ -145,7 +141,7 @@ public class SinkContext {
/**
* get channel
- *
+ *
* @return the channel
*/
public Channel getChannel() {
@@ -154,7 +150,7 @@ public class SinkContext {
/**
* get maxThreads
- *
+ *
* @return the maxThreads
*/
public int getMaxThreads() {
@@ -163,7 +159,7 @@ public class SinkContext {
/**
* get processInterval
- *
+ *
* @return the processInterval
*/
public long getProcessInterval() {
@@ -172,7 +168,7 @@ public class SinkContext {
/**
* get reloadInterval
- *
+ *
* @return the reloadInterval
*/
public long getReloadInterval() {
@@ -181,7 +177,7 @@ public class SinkContext {
/**
* get metricItemSet
- *
+ *
* @return the metricItemSet
*/
public DataProxyMetricItemSet getMetricItemSet() {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index cf019ecb4..97dcaa210 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -34,7 +34,7 @@ import org.apache.flume.source.shaded.guava.RateLimiter;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
@@ -70,15 +70,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TubeSink extends AbstractSink implements Configurable {
- protected static final ConcurrentHashMap<String, Long> agentIdMap = new ConcurrentHashMap<String, Long>();
-
+ protected static final ConcurrentHashMap<String, Long> agentIdMap = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
- // private static int BATCH_SIZE = 10000; //unused
- private static final int sendNewMetricRetryCount = 3;
- private static final String topicsFilePath = "topics.properties"; //unused
- private static final String slaTopicFilePath = "slaTopics.properties";
-
private static final LoadingCache<String, Long> agentIdCache = CacheBuilder
.newBuilder().concurrencyLevel(4 * 8).initialCapacity(5000000)
.expireAfterAccess(30, TimeUnit.SECONDS)
@@ -89,28 +83,27 @@ public class TubeSink extends AbstractSink implements Configurable {
return System.currentTimeMillis();
}
});
+ private static final String TOPIC = "topic";
protected static boolean idCleanerStarted = false;
- private static String TOPIC = "topic";
- private static ConcurrentHashMap<String, Long> illegalTopicMap = new ConcurrentHashMap<String, Long>();
+ private static ConcurrentHashMap<String, Long> illegalTopicMap = new ConcurrentHashMap<>();
// key: masterUrl
public Map<String, TubeMultiSessionFactory> sessionFactories;
public Map<String, List<TopicProducerInfo>> masterUrl2producers;
// key: topic
public Map<String, List<TopicProducerInfo>> producerInfoMap;
- public AtomicInteger currentPublishTopicNum = new AtomicInteger(0);
private volatile boolean canTake = false;
private volatile boolean canSend = false;
private ConfigManager configManager;
private Map<String, String> topicProperties;
- private ThirdPartyClusterConfig tubeConfig;
+ private MQClusterConfig tubeConfig;
private Set<String> masterHostAndPortLists;
+
// used for RoundRobin different cluster while send message
private AtomicInteger clusterIndex = new AtomicInteger(0);
private LinkedBlockingQueue<EventStat> resendQueue;
private LinkedBlockingQueue<Event> eventQueue;
private RateLimiter diskRateLimiter;
private Thread[] sinkThreadPool;
- private String metaTopicFilePath = topicsFilePath;
private Map<String, String> dimensions;
private DataProxyMetricItemSet metricItemSet;
private IdCacheCleaner idCacheCleaner;
@@ -123,9 +116,6 @@ public class TubeSink extends AbstractSink implements Configurable {
/**
* diff publish
- *
- * @param originalSet
- * @param endSet
*/
public void diffSetPublish(Set<String> originalSet, Set<String> endSet) {
if (SetUtils.isEqualSet(originalSet, endSet)) {
@@ -155,6 +145,7 @@ public class TubeSink extends AbstractSink implements Configurable {
/**
* when masterUrlLists change, update tubeClient
+ *
* @param originalCluster previous masterHostAndPortList set
* @param endCluster new masterHostAndPortList set
*/
@@ -202,30 +193,22 @@ public class TubeSink extends AbstractSink implements Configurable {
// start new client
for (String masterUrl : endCluster) {
if (!originalCluster.contains(masterUrl)) {
-
TubeMultiSessionFactory sessionFactory = createConnection(masterUrl);
-
if (sessionFactory != null) {
List<Set<String>> topicGroups = partitionTopicSet(new HashSet<>(topicProperties.values()));
for (Set<String> topicSet : topicGroups) {
createTopicProducers(masterUrl, sessionFactory, topicSet);
}
-
logger.info("successfully start new tubeClient for the new masterList: {}", masterUrl);
}
}
}
- masterHostAndPortLists = configManager.getThirdPartyClusterUrl2Token().keySet();
+ masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
}
/**
* when there are multi clusters, pick producer based on round-robin
- *
- * @param topic
- * @return
- *
- * @throws TubeClientException
*/
private MessageProducer getProducer(String topic) throws TubeClientException {
if (producerInfoMap.containsKey(topic) && !producerInfoMap.get(topic).isEmpty()) {
@@ -299,17 +282,10 @@ public class TubeSink extends AbstractSink implements Configurable {
TubeClientConfig conf = initTubeConfig(masterHostAndPortList);
sessionFactory = new TubeMultiSessionFactory(conf);
sessionFactories.put(masterHostAndPortList, sessionFactory);
- } catch (TubeClientException e) {
- logger.error("create connnection error in tubesink, "
- + "maybe tube master set error, please re-check. ex1 {}", e.getMessage());
- throw new FlumeException("connect to Tube error1, "
- + "maybe zkstr/zkroot set error, please re-check");
} catch (Throwable e) {
- logger.error("create connnection error in tubesink, "
- + "maybe tube master set error/shutdown in progress, please re-check. ex2 {}",
- e.getMessage());
- throw new FlumeException("connect to meta error2, "
- + "maybe tube master set error/shutdown in progress, please re-check");
+ logger.error("connect to tube meta error, maybe tube master set error/shutdown, please re-check", e);
+ throw new FlumeException("connect to tube meta error, maybe tube master set error/shutdown in progress, "
+ + "please re-check");
}
return sessionFactory;
}
@@ -326,11 +302,8 @@ public class TubeSink extends AbstractSink implements Configurable {
for (TubeMultiSessionFactory sessionFactory : sessionFactories.values()) {
try {
sessionFactory.shutdown();
- } catch (TubeClientException e) {
- logger.error("destroy sessionFactory error in tubesink, MetaClientException {}",
- e.getMessage());
} catch (Exception e) {
- logger.error("destroy sessionFactory error in tubesink, ex {}", e.getMessage());
+ logger.error("destroy sessionFactory error in tubesink: ", e);
}
}
}
@@ -342,14 +315,11 @@ public class TubeSink extends AbstractSink implements Configurable {
/**
* partition topicSet to different group, each group is associated with a producer;
* if there are multi clusters, then each group is associated with a set of producer
- *
- * @param topicSet
- * @return
*/
private List<Set<String>> partitionTopicSet(Set<String> topicSet) {
List<Set<String>> topicGroups = new ArrayList<>();
- List<String> sortedList = new ArrayList(topicSet);
+ List<String> sortedList = new ArrayList<>(topicSet);
Collections.sort(sortedList);
int maxTopicsEachProducerHolder = tubeConfig.getMaxTopicsEachProducerHold();
int cycle = sortedList.size() / maxTopicsEachProducerHolder;
@@ -357,7 +327,7 @@ public class TubeSink extends AbstractSink implements Configurable {
for (int i = 0; i <= cycle; i++) {
// allocate topic
- Set<String> subset = new HashSet<String>();
+ Set<String> subset = new HashSet<>();
int startIndex = i * maxTopicsEachProducerHolder;
int endIndex = startIndex + maxTopicsEachProducerHolder - 1;
if (i == cycle) {
@@ -378,10 +348,6 @@ public class TubeSink extends AbstractSink implements Configurable {
/**
* create producer and publish topic
- *
- * @param masterUrl
- * @param sessionFactory
- * @param topicGroup
*/
private void createTopicProducers(String masterUrl, TubeMultiSessionFactory sessionFactory,
Set<String> topicGroup) {
@@ -420,20 +386,18 @@ public class TubeSink extends AbstractSink implements Configurable {
this.dimensions = new HashMap<>();
this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
- //register metrics
+ // register metrics
this.metricItemSet = new DataProxyMetricItemSet(this.getName());
MetricRegister.register(metricItemSet);
- //create tube connection
+ // create tube connection
try {
initCreateConnection();
} catch (FlumeException e) {
logger.error("Unable to create tube client" + ". Exception follows.", e);
-
- /* Try to prevent leaking resources. */
+ // Try to prevent leaking resources
destroyConnection();
-
- /* FIXME: Mark ourselves as failed. */
+ // FIXME: Mark ourselves as failed
stop();
return;
}
@@ -463,9 +427,6 @@ public class TubeSink extends AbstractSink implements Configurable {
/**
* resend event
- *
- * @param es
- * @param isDecrement
*/
private void resendEvent(EventStat es, boolean isDecrement) {
try {
@@ -512,8 +473,7 @@ public class TubeSink extends AbstractSink implements Configurable {
dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
event.getHeaders().get(TOPIC));
} else {
- dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
- "");
+ dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
}
if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
@@ -531,7 +491,6 @@ public class TubeSink extends AbstractSink implements Configurable {
metricItem.readFailSize.addAndGet(event.getBody().length);
}
} else {
-
// logger.info("[{}]No data to process in the channel.",getName());
status = Status.BACKOFF;
tx.commit();
@@ -541,8 +500,7 @@ public class TubeSink extends AbstractSink implements Configurable {
try {
tx.rollback();
} catch (Throwable e) {
- logger.error("metasink transaction rollback exception", e);
-
+ logger.error("meta sink transaction rollback exception", e);
}
} finally {
tx.close();
@@ -552,37 +510,28 @@ public class TubeSink extends AbstractSink implements Configurable {
@Override
public void configure(Context context) {
- logger.info(context.toString());
-// logger.info("sinktest:"+getName()+getChannel());//sinktest:meta-sink-msg2null
+ logger.info("configure from context: {}", context);
configManager = ConfigManager.getInstance();
topicProperties = configManager.getTopicProperties();
- masterHostAndPortLists = configManager.getThirdPartyClusterUrl2Token().keySet();
- Preconditions.checkState(masterHostAndPortLists != null || masterHostAndPortLists.isEmpty(),
- "No master and port list specified");
- tubeConfig = configManager.getThirdPartyClusterConfig();
+ masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
+ tubeConfig = configManager.getMqClusterConfig();
configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
@Override
public void update() {
-
- diffSetPublish(new HashSet<String>(topicProperties.values()),
- new HashSet<String>(configManager.getTopicProperties().values()));
+ diffSetPublish(new HashSet<>(topicProperties.values()),
+ new HashSet<>(configManager.getTopicProperties().values()));
}
});
- configManager.getThirdPartyClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
+ configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() {
@Override
public void update() {
- diffUpdateTubeClient(masterHostAndPortLists, configManager.getThirdPartyClusterUrl2Token().keySet());
+ diffUpdateTubeClient(masterHostAndPortLists, configManager.getMqClusterUrl2Token().keySet());
}
});
producerInfoMap = new ConcurrentHashMap<>();
masterUrl2producers = new ConcurrentHashMap<>();
-
- if (tubeConfig.getEnableSlaMetricSink()) {
- this.metaTopicFilePath = slaTopicFilePath;
- }
-
clientIdCache = tubeConfig.getClientIdCache();
if (clientIdCache) {
int survivedTime = tubeConfig.getMaxSurvivedTime();
@@ -609,7 +558,7 @@ public class TubeSink extends AbstractSink implements Configurable {
sinkThreadPool = new Thread[threadNum];
int eventQueueSize = tubeConfig.getEventQueueSize();
Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0");
- eventQueue = new LinkedBlockingQueue<Event>(eventQueueSize);
+ eventQueue = new LinkedBlockingQueue<>(eventQueueSize);
if (tubeConfig.getDiskIoRatePerSec() != 0) {
diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec());
@@ -617,8 +566,8 @@ public class TubeSink extends AbstractSink implements Configurable {
}
- private Map getNewDimension(String otherKey, String value) {
- Map dimensions = new HashMap<>();
+ private Map<String, String> getNewDimension(String otherKey, String value) {
+ Map<String, String> dimensions = new HashMap<>();
dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
dimensions.put(otherKey, value);
@@ -659,7 +608,6 @@ public class TubeSink extends AbstractSink implements Configurable {
producer.sendMessage(message, new MyCallback(es));
flag.set(true);
-
}
} else {
boolean hasKey = false;
@@ -715,7 +663,7 @@ public class TubeSink extends AbstractSink implements Configurable {
@Override
public void run() {
- logger.info("Sink task {} started.", Thread.currentThread().getName());
+ logger.info("sink task {} started.", Thread.currentThread().getName());
while (canSend) {
boolean decrementFlag = false;
boolean resendBadEvent = false;
@@ -761,12 +709,10 @@ public class TubeSink extends AbstractSink implements Configurable {
if (expireTime != null) {
long currentTime = System.currentTimeMillis();
if (expireTime > currentTime) {
-
// TODO: need to be improved.
// reChannelEvent(es, topic);
continue;
} else {
-
illegalTopicMap.remove(topic);
}
}
@@ -785,7 +731,6 @@ public class TubeSink extends AbstractSink implements Configurable {
AtomicBoolean flagAtomic = new AtomicBoolean(decrementFlag);
sendMessage(producer, event, topic, flagAtomic, es);
decrementFlag = flagAtomic.get();
-
} catch (InterruptedException e) {
logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
return;
@@ -832,10 +777,6 @@ public class TubeSink extends AbstractSink implements Configurable {
/**
* addMetric
- *
- * @param event
- * @param result
- * @param sendTime
*/
private void addMetric(Event event, boolean result, long sendTime) {
Map<String, String> dimensions = new HashMap<>();
@@ -893,10 +834,8 @@ public class TubeSink extends AbstractSink implements Configurable {
if (producer != null) {
try {
producer.shutdown();
- } catch (TubeClientException e) {
- logger.error("destroy producer error in tubesink, MetaClientException {}", e.getMessage());
} catch (Throwable e) {
- logger.error("destroy producer error in tubesink, ex {}", e.getMessage());
+ logger.error("destroy producer error in tube sink", e);
}
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index c3ed19b7e..824b7a5d9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -26,7 +26,7 @@ import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
import org.apache.inlong.dataproxy.base.OrderEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -99,7 +99,7 @@ public class PulsarClientService {
*
* @param pulsarConfig
*/
- public PulsarClientService(ThirdPartyClusterConfig pulsarConfig, int sinkThreadPoolSize) {
+ public PulsarClientService(MQClusterConfig pulsarConfig, int sinkThreadPoolSize) {
this.sinkThreadPoolSize = sinkThreadPoolSize;
@@ -288,7 +288,7 @@ public class PulsarClientService {
return;
}
pulsarClients = new ConcurrentHashMap<>();
- pulsarUrl2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+ pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
Preconditions.checkState(!pulsarUrl2token.isEmpty(), "No pulsar server url specified");
logger.debug("number of pulsar cluster is {}", pulsarUrl2token.size());
for (Map.Entry<String, String> info : pulsarUrl2token.entrySet()) {
@@ -328,7 +328,7 @@ public class PulsarClientService {
private PulsarClient initPulsarClient(String pulsarUrl, String token) throws Exception {
ClientBuilder builder = PulsarClient.builder();
- if (ThirdPartyClusterConfig.PULSAR_DEFAULT_AUTH_TYPE.equals(authType) && StringUtils.isNotEmpty(token)) {
+ if (MQClusterConfig.PULSAR_DEFAULT_AUTH_TYPE.equals(authType) && StringUtils.isNotEmpty(token)) {
builder.authentication(AuthenticationFactory.token(token));
}
builder.serviceUrl(pulsarUrl)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
index 3157f0dc8..2b44a2d88 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.sink.EventStat;
@@ -66,7 +66,7 @@ public class SinkTask extends Thread {
private LoadingCache<String, Long> agentIdCache;
- private ThirdPartyClusterConfig pulsarConfig;
+ private MQClusterConfig pulsarConfig;
private int maxRetrySendCnt;
/*
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestThirdPartyClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMQClusterConfigLoader.java
similarity index 75%
rename from inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestThirdPartyClusterConfigLoader.java
rename to inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMQClusterConfigLoader.java
index 45ee12f33..67cf12f69 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestThirdPartyClusterConfigLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestMQClusterConfigLoader.java
@@ -15,41 +15,44 @@
* limitations under the License.
*/
-package org.apache.inlong.dataproxy.config.loader;
+package org.apache.inlong.dataproxy.config.holder;
import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-public class TestThirdPartyClusterConfigLoader {
+/**
+ * Test for {@link MQClusterConfigHolder}
+ */
+public class TestMQClusterConfigLoader {
@Test
public void testUpdateUrl() {
- Map<String, String> url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+ Map<String, String> url2token = ConfigManager.getInstance().getMqClusterUrl2Token();
Assert.assertEquals(url2token.size(), 3);
Map<String, String> newUrl = new HashMap<>();
newUrl.put("127.0.0.1:8088", "test");
- ConfigManager.getInstance().getThirdPartyClusterHolder().setUrl2token(newUrl);
- url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+ ConfigManager.getInstance().getMqClusterHolder().setUrl2token(newUrl);
+ url2token = ConfigManager.getInstance().getMqClusterUrl2Token();
Assert.assertEquals(newUrl, url2token);
}
@Test
public void testCommonConfig() {
- ThirdPartyClusterConfig config = ConfigManager.getInstance().getThirdPartyClusterConfig();
+ MQClusterConfig config = ConfigManager.getInstance().getMqClusterConfig();
Assert.assertEquals(config.getAuthType(), "token");
Assert.assertEquals(config.getThreadNum(), 5);
- Assert.assertEquals(config.getEnableBatch(), true);
+ Assert.assertTrue(config.getEnableBatch());
Map<String, String> newConfig = new HashMap<>();
newConfig.put("thread_num", "10");
newConfig.put("disk_io_rate_per_sec", "60000");
- ConfigManager.getInstance().getThirdPartyClusterConfig().putAll(newConfig);
- config = ConfigManager.getInstance().getThirdPartyClusterConfig();
+ ConfigManager.getInstance().getMqClusterConfig().putAll(newConfig);
+ config = ConfigManager.getInstance().getMqClusterConfig();
Assert.assertEquals(config.getAuthType(), "token");
Assert.assertEquals(config.getThreadNum(), 10);
Assert.assertEquals(config.getDiskIoRatePerSec(), 60000);
@@ -57,16 +60,16 @@ public class TestThirdPartyClusterConfigLoader {
@Test
public void testTubeUrl() {
- Map<String, String> url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+ Map<String, String> url2token = ConfigManager.getInstance().getMqClusterUrl2Token();
Assert.assertEquals(url2token.size(), 3);
Assert.assertEquals("", url2token.get("127.0.0.1:8080,127.0.0.1:8088"));
}
@Test
public void testPulsarUrl() {
- Map<String, String> url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
+ Map<String, String> url2token = ConfigManager.getInstance().getMqClusterUrl2Token();
Assert.assertEquals("pulsartoken1", url2token.get("pulsar1://127.0.0.1:6650,pulsar2://127.0.0.1:6600"));
Assert.assertEquals("pulsartoken2", url2token.get("pulsar2://127.0.0.1:6680"));
-
}
+
}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
index 6ec998fbc..77773cffc 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,27 +17,24 @@
package org.apache.inlong.dataproxy.config.loader;
-import static org.junit.Assert.assertEquals;
+import org.apache.inlong.common.metric.MetricListener;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.junit.Test;
import java.util.Map;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
-import org.apache.inlong.common.metric.MetricListener;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
/**
- *
- * TestClassResourceCommonPropertiesLoader
+ * Test for {@link ClassResourceCommonPropertiesLoader}
*/
public class TestClassResourceCommonPropertiesLoader {
/**
* testResult
- *
- * @throws Exception
*/
@Test
- public void testResult() throws Exception {
+ public void testResult() {
// increase source
ClassResourceCommonPropertiesLoader loader = new ClassResourceCommonPropertiesLoader();
Map<String, String> props = loader.load();
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
index 1501f6371..12bf36991 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,30 +17,29 @@
package org.apache.inlong.dataproxy.config.loader;
-import static org.junit.Assert.assertEquals;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
+import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertEquals;
/**
- *
- * TestContextCacheClusterConfigLoader
+ * Test for {@link ContextCacheClusterConfigLoader}
*/
public class TestContextCacheClusterConfigLoader {
public static final Logger LOG = LoggerFactory.getLogger(TestContextCacheClusterConfigLoader.class);
- private static Context context;
private static Context sinkContext;
/**
@@ -48,19 +47,17 @@ public class TestContextCacheClusterConfigLoader {
*/
@BeforeClass
public static void setup() {
- Map<String, String> result = new ConcurrentHashMap<>();
- try (InputStream inStream = TestContextCacheClusterConfigLoader.class.getClassLoader().getResource(
- "dataproxy-pulsar.conf")
- .openStream()) {
+ URL resource = TestContextCacheClusterConfigLoader.class.getClassLoader().getResource("dataproxy-pulsar.conf");
+ try (InputStream inStream = Objects.requireNonNull(resource).openStream()) {
Properties props = new Properties();
props.load(inStream);
+
+ Map<String, String> result = new ConcurrentHashMap<>();
for (Map.Entry<Object, Object> entry : props.entrySet()) {
result.put((String) entry.getKey(), (String) entry.getValue());
}
- context = new Context(result);
+ Context context = new Context(result);
sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
- } catch (UnsupportedEncodingException e) {
- LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
} catch (Exception e) {
LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
}
@@ -68,11 +65,9 @@ public class TestContextCacheClusterConfigLoader {
/**
* testResult
- *
- * @throws Exception
*/
@Test
- public void testResult() throws Exception {
+ public void testResult() {
ContextCacheClusterConfigLoader loader = new ContextCacheClusterConfigLoader();
loader.configure(sinkContext);
List<CacheClusterConfig> configList = loader.load();
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
index 376717bc6..9cca96efd 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,30 +17,29 @@
package org.apache.inlong.dataproxy.config.loader;
-import static org.junit.Assert.assertEquals;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
+import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertEquals;
/**
- *
- * TestContextCacheClusterConfigLoader
+ * Test for {@link ContextCacheClusterConfigLoader}
*/
public class TestContextIdTopicConfigLoader {
public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
- private static Context context;
private static Context sinkContext;
/**
@@ -48,19 +47,16 @@ public class TestContextIdTopicConfigLoader {
*/
@BeforeClass
public static void setup() {
- Map<String, String> result = new ConcurrentHashMap<>();
- try (InputStream inStream = TestContextIdTopicConfigLoader.class.getClassLoader().getResource(
- "dataproxy-pulsar.conf")
- .openStream()) {
+ URL resource = TestContextIdTopicConfigLoader.class.getClassLoader().getResource("dataproxy-pulsar.conf");
+ try (InputStream inStream = Objects.requireNonNull(resource).openStream()) {
Properties props = new Properties();
props.load(inStream);
+ Map<String, String> result = new ConcurrentHashMap<>();
for (Map.Entry<Object, Object> entry : props.entrySet()) {
result.put((String) entry.getKey(), (String) entry.getValue());
}
- context = new Context(result);
+ Context context = new Context(result);
sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
- } catch (UnsupportedEncodingException e) {
- LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
} catch (Exception e) {
LOG.error("fail to load properties, file ={}, and e= {}", "dataproxy-pulsar.conf", e);
}
@@ -68,11 +64,9 @@ public class TestContextIdTopicConfigLoader {
/**
* testResult
- *
- * @throws Exception
*/
@Test
- public void testResult() throws Exception {
+ public void testResult() {
ContextIdTopicConfigLoader loader = new ContextIdTopicConfigLoader();
loader.configure(sinkContext);
List<IdTopicConfig> configList = loader.load();
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
index 7729deac9..af0e03ae1 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
index 32e5a2163..e072f150f 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
index a074fe519..8996931ec 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
index a372d7daa..7fd6fee7c 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -1,22 +1,21 @@
#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
#
+
proxy.cluster.name=proxy_inlong5th_sz
metricDomains=DataProxy
metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
-metricDomains.DataProxy.snapshotInterval=60000
\ No newline at end of file
+metricDomains.DataProxy.snapshotInterval=60000
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/dataproxy-pulsar.conf b/inlong-dataproxy/dataproxy-source/src/test/resources/dataproxy-pulsar.conf
index d472f5146..33da0b457 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/dataproxy-pulsar.conf
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/dataproxy-pulsar.conf
@@ -1,21 +1,20 @@
#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
#
+
proxy_inlong5th_sz.channels=ch-msg1
proxy_inlong5th_sz.sinks=pulsar-sink-more1
proxy_inlong5th_sz.sources=agent-source sdk-source
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/third_party_cluster.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/mq_cluster.properties
similarity index 77%
rename from inlong-dataproxy/dataproxy-source/src/test/resources/third_party_cluster.properties
rename to inlong-dataproxy/dataproxy-source/src/test/resources/mq_cluster.properties
index 9ecb8b09b..a49e21048 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/third_party_cluster.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/mq_cluster.properties
@@ -15,10 +15,9 @@
# limitations under the License.
#
-third-party-cluster.index1=pulsar1://127.0.0.1:6650,pulsar2://127.0.0.1:6600=pulsartoken1
-third-party-cluster.index2=pulsar2://127.0.0.1:6680=pulsartoken2
-
-third-party-cluster.index3=127.0.0.1:8080,127.0.0.1:8088=
+mq_cluster.index1=pulsar1://127.0.0.1:6650,pulsar2://127.0.0.1:6600=pulsartoken1
+mq_cluster.index2=pulsar2://127.0.0.1:6680=pulsartoken2
+mq_cluster.index3=127.0.0.1:8080,127.0.0.1:8088=
pulsar_auth_type=token
thread_num=5
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/topics.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/topics.properties
index e1cac2480..76fd0d4f0 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/topics.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/topics.properties
@@ -15,4 +15,4 @@
# limitations under the License.
#
-b_docker_test_1.docker_test_1=persistent://public/b_docker_test_1/docker_test_1
\ No newline at end of file
+b_docker_test_1.docker_test_1=persistent://public/b_docker_test_1/docker_test_1
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
index 06a3a64bd..9405505c7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/InlongClusterService.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.core;
import com.github.pagehelper.PageInfo;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
@@ -143,7 +143,7 @@ public interface InlongClusterService {
* @param clusterName cluster name
* @return data proxy config, includes mq clusters and topics
*/
- ThirdPartyClusterDTO getDataProxyConfig(String clusterTag, String clusterName);
+ DataProxyConfig getDataProxyConfig(String clusterTag, String clusterName);
/**
* Get data proxy cluster list by the given cluster name
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
index dd5300a05..25f278053 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceImpl.java
@@ -27,8 +27,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterInfo;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GlobalConstants;
@@ -339,7 +339,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
}
@Override
- public ThirdPartyClusterDTO getDataProxyConfig(String clusterTag, String clusterName) {
+ public DataProxyConfig getDataProxyConfig(String clusterTag, String clusterName) {
LOGGER.debug("GetDPConfig: begin to get config by cluster tag={} name={}", clusterTag, clusterName);
// get all data proxy clusters
@@ -349,7 +349,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
.type(ClusterType.DATA_PROXY.getType())
.build();
List<InlongClusterEntity> clusterList = clusterMapper.selectByCondition(request);
- ThirdPartyClusterDTO result = new ThirdPartyClusterDTO();
+ DataProxyConfig result = new DataProxyConfig();
if (CollectionUtils.isEmpty(clusterList)) {
LOGGER.warn("GetDPConfig: data proxy cluster not found by tag={} name={}", clusterTag, clusterName);
return result;
@@ -372,7 +372,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated group num={}",
clusterTagList, groupList.size());
- List<DataProxyConfig> topicList = new ArrayList<>();
+ List<DataProxyTopicInfo> topicList = new ArrayList<>();
for (InlongGroupBriefInfo groupInfo : groupList) {
String groupId = groupInfo.getInlongGroupId();
String mqResource = groupInfo.getMqResource();
@@ -400,13 +400,13 @@ public class InlongClusterServiceImpl implements InlongClusterService {
String streamId = streamInfo.getInlongStreamId();
String topic = String.format(InlongGroupSettings.PULSAR_TOPIC_FORMAT,
tenant, mqResource, streamInfo.getMqResource());
- DataProxyConfig topicConfig = new DataProxyConfig();
+ DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
topicConfig.setInlongGroupId(groupId + "/" + streamId);
topicConfig.setTopic(topic);
topicList.add(topicConfig);
}
} else if (type == MQType.TUBE) {
- DataProxyConfig topicConfig = new DataProxyConfig();
+ DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
topicConfig.setInlongGroupId(groupId);
topicConfig.setTopic(mqResource);
topicList.add(topicConfig);
@@ -415,7 +415,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
// get mq cluster info
LOGGER.debug("GetDPConfig: begin to get mq clusters by tags={}", clusterTagList);
- List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
+ List<MQClusterInfo> mqSet = new ArrayList<>();
List<String> typeList = Arrays.asList(ClusterType.TUBE.getType(), ClusterType.PULSAR.getType());
InlongClusterPageRequest pageRequest = InlongClusterPageRequest.builder()
.typeList(typeList)
@@ -423,7 +423,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
.build();
List<InlongClusterEntity> mqClusterList = clusterMapper.selectByCondition(pageRequest);
for (InlongClusterEntity cluster : mqClusterList) {
- ThirdPartyClusterInfo clusterInfo = new ThirdPartyClusterInfo();
+ MQClusterInfo clusterInfo = new MQClusterInfo();
clusterInfo.setUrl(cluster.getUrl());
clusterInfo.setToken(cluster.getToken());
Map<String, String> configParams = GSON.fromJson(cluster.getExtParams(), Map.class);
@@ -431,7 +431,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
mqSet.add(clusterInfo);
}
- result.setMqSet(mqSet);
+ result.setMqClusterList(mqSet);
result.setTopicList(topicList);
return result;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
index 70dfe7f52..ddb11af17 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
@@ -41,10 +41,8 @@ public class PulsarUtils {
/**
* Get pulsar admin info
*/
- public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarClusterInfo)
- throws PulsarClientException {
- Preconditions.checkNotNull(pulsarClusterInfo.getAdminUrl(), "pulsar adminUrl is empty, "
- + "check third party cluster table");
+ public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarClusterInfo) throws PulsarClientException {
+ Preconditions.checkNotNull(pulsarClusterInfo.getAdminUrl(), "Pulsar adminUrl cannot be empty");
PulsarAdmin pulsarAdmin;
if (StringUtils.isEmpty(pulsarClusterInfo.getToken())) {
pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl());
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index 3c3701a8f..f3993a79a 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -21,7 +21,8 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.manager.service.core.InlongClusterService;
@@ -68,14 +69,14 @@ public class DataProxyController {
@ApiImplicitParam(name = "clusterTag", value = "cluster tag", dataTypeClass = String.class),
@ApiImplicitParam(name = "clusterName", value = "cluster name", dataTypeClass = String.class)
})
- public Response<ThirdPartyClusterDTO> getConfig(
+ public Response<DataProxyConfig> getConfig(
@RequestParam(required = false) String clusterTag,
@RequestParam(required = true) String clusterName) {
- ThirdPartyClusterDTO dto = clusterService.getDataProxyConfig(clusterTag, clusterName);
- if (dto.getMqSet().isEmpty() || dto.getTopicList().isEmpty()) {
+ DataProxyConfig config = clusterService.getDataProxyConfig(clusterTag, clusterName);
+ if (CollectionUtils.isEmpty(config.getMqClusterList()) || CollectionUtils.isEmpty(config.getTopicList())) {
return Response.fail("failed to get mq clusters or topics");
}
- return Response.success(dto);
+ return Response.success(config);
}
@GetMapping("/dataproxy/getAllConfig")