You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/26 13:19:36 UTC
[incubator-inlong] branch master updated: [INLONG-1459][DataProxy]
proxy address configuration is redundant for inlong-agent (#1482)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b6bf11b [INLONG-1459][DataProxy] proxy address configuration is redundant for inlong-agent (#1482)
b6bf11b is described below
commit b6bf11b5c63b53c04022783d80476210a9276ad0
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Thu Aug 26 21:19:29 2021 +0800
[INLONG-1459][DataProxy] proxy address configuration is redundant for inlong-agent (#1482)
Co-authored-by: stingpeng <st...@tencent.com>
---
.../plugin/fetcher/ManagerResultFormatter.java | 2 +-
.../apache/inlong/dataproxy/ProxyClientConfig.java | 3 +-
.../dataproxy/config/ProxyConfigManager.java | 60 +++++++++-------------
3 files changed, 27 insertions(+), 38 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index 04e88ed..7c164dc 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -119,7 +119,7 @@ public class ManagerResultFormatter {
if (!dataConfigs.getAdditionalAttr().isEmpty()) {
job.setAddictiveString(dataConfigs.getAdditionalAttr());
}
- if (!dataConfigs.getCycleUnit().isEmpty()) {
+ if (dataConfigs.getCycleUnit() != null) {
job.setCycleUnit(dataConfigs.getCycleUnit());
}
return job;
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
index 392665e..720c5b1 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
@@ -55,7 +55,7 @@ public class ProxyClientConfig {
private boolean enableSlaMetric = false;
private int managerConnectionTimeout = 10000;
- private boolean readProxyIPFromLocal = true;
+ private boolean readProxyIPFromLocal = false;
/**
* Default connection, handshake, and initial request timeout in
* milliseconds.
@@ -99,6 +99,7 @@ public class ProxyClientConfig {
if (Utils.isBlank(managerIp)) {
throw new IllegalArgumentException("managerIp is Blank!");
}
+ this.proxyIPServiceURL = "http://" + managerIp + ":" + managerPort + "/api/inlong/manager/openapi/dataproxy/getIpList";
this.bid = bid;
this.netTag = netTag;
this.isLocalVisit = isLocalVisit;
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
index a6491b5..ca3c54e 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
@@ -53,9 +53,9 @@ import org.apache.commons.io.FileUtils;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClients;
@@ -75,7 +75,9 @@ import org.slf4j.LoggerFactory;
public class ProxyConfigManager extends Thread {
+
private static final Logger logger = LoggerFactory.getLogger(ProxyConfigManager.class);
+ public static final String APPLICATION_JSON = "application/json";
private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
/*the status of the cluster.if this value is changed,we need rechoose three proxy*/
@@ -558,22 +560,16 @@ public class ProxyConfigManager extends Thread {
throw new Exception("Read local proxyList File failure by " + filePath + ", reason is " + e.getCause());
}
-
-
int bidNum = 0;
if (localProxyAddrJson.has("bsn")) {
bidNum = localProxyAddrJson.get("bsn").getAsInt();
}
- if (!localProxyAddrJson.has("switch")) {
- throw new Exception("Parse local proxyList failure: switch field is not exist !");
- }
int load = ConfigConstants.LOAD_THRESHOLD;
if (localProxyAddrJson.has("load")) {
int inLoad = localProxyAddrJson.get("load").getAsInt();
load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
}
- int switchStat = Integer.parseInt(localProxyAddrJson.get("switch").getAsString());
ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
proxyEntry.setBid(clientConfig.getBid());
boolean isInterVisit = checkValidProxy(filePath, localProxyAddrJson);
@@ -581,7 +577,7 @@ public class ProxyConfigManager extends Thread {
Map<String, HostInfo> hostMap = getHostInfoMap(
localProxyAddrJson);
proxyEntry.setHostMap(hostMap);
- proxyEntry.setSwitchStat(switchStat);
+ proxyEntry.setSwitchStat(0);
Map<String, Integer> tidMap = getTidMap(localProxyAddrJson);
proxyEntry.setBidNumAndTidNumMap(bidNum, tidMap);
proxyEntry.setLoad(load);
@@ -665,55 +661,35 @@ public class ProxyConfigManager extends Thread {
public ProxyConfigEntry requestProxyList(String url) {
ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
- params.add(new BasicNameValuePair("bid", clientConfig.getBid()));
- params.add(new BasicNameValuePair("net_tag", clientConfig.getNetTag()));
+ params.add(new BasicNameValuePair("netTag", clientConfig.getNetTag()));
params.add(new BasicNameValuePair("ip", this.localIP));
- params.add(new BasicNameValuePair("sn_query", String.valueOf(true)));
logger.info("Begin to get configure from manager {}, param is {}", url, params);
JsonObject jsonRes = requestConfiguration(url, params);
if (jsonRes == null) {
return null;
}
- if (!jsonRes.has("size")) {
- logger.error("Parse proxyList failure: size field is not exist for response from manager!");
- return null;
- }
- int size = jsonRes.get("size").getAsInt();
- if (size == 0) {
- logger.info("Parse proxyList failure: proxy list size = 0!");
- return null;
- }
- boolean isInterVisit = false;
- if (jsonRes.has("isInterVisit")) {
- isInterVisit = jsonRes.get("isInterVisit").getAsInt() != 0;
- }
+
Map<String, HostInfo> hostMap = formatHostInfoMap(
jsonRes);
if (hostMap == null) {
return null;
}
- size = hostMap.size();
int bidNum = 0;
if (jsonRes.has("bsn")) {
bidNum = jsonRes.get("bsn").getAsInt();
}
- if (!jsonRes.has("switch")) {
- logger.error("Parse proxyList failure: switch field is not exist for response from manager!");
- return null;
- }
int load = ConfigConstants.LOAD_THRESHOLD;
if (jsonRes.has("load")) {
int inLoad = jsonRes.get("load").getAsInt();
load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
}
- int switchStat = Integer.parseInt(jsonRes.get("switch").getAsString());
ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
proxyEntry.setBid(clientConfig.getBid());
- proxyEntry.setInterVisit(isInterVisit);
+ proxyEntry.setInterVisit(true);
proxyEntry.setHostMap(hostMap);
- proxyEntry.setSwitchStat(switchStat);
+ proxyEntry.setSwitchStat(0);
Map<String, Integer> tidMap = getTidMap(jsonRes);
proxyEntry.setBidNumAndTidNumMap(bidNum, tidMap);
proxyEntry.setLoad(load);
@@ -726,7 +702,7 @@ public class ProxyConfigManager extends Thread {
private Map<String, HostInfo> formatHostInfoMap(JsonObject jsonRes) {
Map<String, HostInfo> hostMap = new HashMap<String, HostInfo>();
- JsonArray jsonHostList = jsonRes.getAsJsonArray("address");
+ JsonArray jsonHostList = jsonRes.getAsJsonArray("data");
if (jsonHostList == null) {
logger.info("Parse proxyList failure: address field is not exist for response from manager!");
return null;
@@ -745,12 +721,12 @@ public class ProxyConfigManager extends Thread {
+ i + ") for response from manager!");
return null;
}
- if (!jsonItem.has("host")) {
+ if (!jsonItem.has("ip")) {
logger.error("Parse proxyList failure: host field is not exist in address("
+ i + ") for response from manager!");
return null;
}
- String hostItem = jsonItem.get("host").getAsString();
+ String hostItem = jsonItem.get("ip").getAsString();
if (Utils.isBlank(hostItem)) {
logger.error("Parse proxyList failure: host value is blank in address("
+ i + ") for response from manager!");
@@ -848,7 +824,8 @@ public class ProxyConfigManager extends Thread {
Utils.getAuthorizenInfo(clientConfig.getUserName(),
clientConfig.getSecretKey(), timestamp, nonce));
}
- UrlEncodedFormEntity se = new UrlEncodedFormEntity(params);
+
+ StringEntity se = getEntity(params);
httpPost.setEntity(se);
HttpResponse response = httpClient.execute(httpPost);
returnStr = EntityUtils.toString(response.getEntity());
@@ -883,6 +860,17 @@ public class ProxyConfigManager extends Thread {
}
}
+ private StringEntity getEntity(List<BasicNameValuePair> params)
+ throws UnsupportedEncodingException {
+ JsonObject jsonObject = new JsonObject();
+ for (BasicNameValuePair pair : params) {
+ jsonObject.addProperty(pair.getName(), pair.getValue());
+ }
+ StringEntity se = new StringEntity(jsonObject.toString());
+ se.setContentType(APPLICATION_JSON);
+ return se;
+ }
+
private CloseableHttpClient getCloseableHttpClient(List<BasicNameValuePair> params)
throws NoSuchAlgorithmException, KeyManagementException {
CloseableHttpClient httpClient;