You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/26 13:19:36 UTC

[incubator-inlong] branch master updated: [INLONG-1459][DataProxy] proxy address configuration is redundant for inlong-agent (#1482)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b6bf11b  [INLONG-1459][DataProxy] proxy address configuration is redundant for inlong-agent (#1482)
b6bf11b is described below

commit b6bf11b5c63b53c04022783d80476210a9276ad0
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Thu Aug 26 21:19:29 2021 +0800

    [INLONG-1459][DataProxy] proxy address configuration is redundant for inlong-agent (#1482)
    
    Co-authored-by: stingpeng <st...@tencent.com>
---
 .../plugin/fetcher/ManagerResultFormatter.java     |  2 +-
 .../apache/inlong/dataproxy/ProxyClientConfig.java |  3 +-
 .../dataproxy/config/ProxyConfigManager.java       | 60 +++++++++-------------
 3 files changed, 27 insertions(+), 38 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index 04e88ed..7c164dc 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -119,7 +119,7 @@ public class ManagerResultFormatter {
         if (!dataConfigs.getAdditionalAttr().isEmpty()) {
             job.setAddictiveString(dataConfigs.getAdditionalAttr());
         }
-        if (!dataConfigs.getCycleUnit().isEmpty()) {
+        if (dataConfigs.getCycleUnit() != null) {
             job.setCycleUnit(dataConfigs.getCycleUnit());
         }
         return job;
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
index 392665e..720c5b1 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/ProxyClientConfig.java
@@ -55,7 +55,7 @@ public class ProxyClientConfig {
     private boolean enableSlaMetric = false;
 
     private int managerConnectionTimeout = 10000;
-    private boolean readProxyIPFromLocal = true;
+    private boolean readProxyIPFromLocal = false;
     /**
      * Default connection, handshake, and initial request timeout in
      * milliseconds.
@@ -99,6 +99,7 @@ public class ProxyClientConfig {
         if (Utils.isBlank(managerIp)) {
             throw new IllegalArgumentException("managerIp is Blank!");
         }
+        this.proxyIPServiceURL = "http://" + managerIp + ":" + managerPort + "/api/inlong/manager/openapi/dataproxy/getIpList";
         this.bid = bid;
         this.netTag = netTag;
         this.isLocalVisit = isLocalVisit;
diff --git a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
index a6491b5..ca3c54e 100644
--- a/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
+++ b/inlong-dataproxy-sdk/src/main/java/org/apache/inlong/dataproxy/config/ProxyConfigManager.java
@@ -53,9 +53,9 @@ import org.apache.commons.io.FileUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.http.impl.client.HttpClients;
@@ -75,7 +75,9 @@ import org.slf4j.LoggerFactory;
 
 
 public class ProxyConfigManager extends Thread {
+
     private static final Logger logger = LoggerFactory.getLogger(ProxyConfigManager.class);
+    public static final String APPLICATION_JSON = "application/json";
 
     private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
     /*the status of the cluster.if this value is changed,we need rechoose  three proxy*/
@@ -558,22 +560,16 @@ public class ProxyConfigManager extends Thread {
             throw new Exception("Read local proxyList File failure by " + filePath + ", reason is " + e.getCause());
         }
 
-
-
         int bidNum = 0;
         if (localProxyAddrJson.has("bsn")) {
             bidNum = localProxyAddrJson.get("bsn").getAsInt();
         }
 
-        if (!localProxyAddrJson.has("switch")) {
-            throw new Exception("Parse local proxyList failure: switch field is not exist !");
-        }
         int load = ConfigConstants.LOAD_THRESHOLD;
         if (localProxyAddrJson.has("load")) {
             int inLoad = localProxyAddrJson.get("load").getAsInt();
             load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
         }
-        int switchStat = Integer.parseInt(localProxyAddrJson.get("switch").getAsString());
         ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
         proxyEntry.setBid(clientConfig.getBid());
         boolean isInterVisit = checkValidProxy(filePath, localProxyAddrJson);
@@ -581,7 +577,7 @@ public class ProxyConfigManager extends Thread {
         Map<String, HostInfo> hostMap = getHostInfoMap(
             localProxyAddrJson);
         proxyEntry.setHostMap(hostMap);
-        proxyEntry.setSwitchStat(switchStat);
+        proxyEntry.setSwitchStat(0);
         Map<String, Integer> tidMap = getTidMap(localProxyAddrJson);
         proxyEntry.setBidNumAndTidNumMap(bidNum, tidMap);
         proxyEntry.setLoad(load);
@@ -665,55 +661,35 @@ public class ProxyConfigManager extends Thread {
 
     public ProxyConfigEntry requestProxyList(String url) {
         ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
-        params.add(new BasicNameValuePair("bid", clientConfig.getBid()));
-        params.add(new BasicNameValuePair("net_tag", clientConfig.getNetTag()));
+        params.add(new BasicNameValuePair("netTag", clientConfig.getNetTag()));
         params.add(new BasicNameValuePair("ip", this.localIP));
-        params.add(new BasicNameValuePair("sn_query", String.valueOf(true)));
         logger.info("Begin to get configure from manager {}, param is {}", url, params);
 
         JsonObject jsonRes = requestConfiguration(url, params);
         if (jsonRes == null) {
             return null;
         }
-        if (!jsonRes.has("size")) {
-            logger.error("Parse proxyList failure: size field is not exist for response from manager!");
-            return null;
-        }
-        int size = jsonRes.get("size").getAsInt();
-        if (size == 0) {
-            logger.info("Parse proxyList failure: proxy list size = 0!");
-            return null;
-        }
-        boolean isInterVisit = false;
-        if (jsonRes.has("isInterVisit")) {
-            isInterVisit = jsonRes.get("isInterVisit").getAsInt() != 0;
-        }
+
         Map<String, HostInfo> hostMap = formatHostInfoMap(
             jsonRes);
 
         if (hostMap == null) {
             return null;
         }
-        size = hostMap.size();
         int bidNum = 0;
         if (jsonRes.has("bsn")) {
             bidNum = jsonRes.get("bsn").getAsInt();
         }
-        if (!jsonRes.has("switch")) {
-            logger.error("Parse proxyList failure: switch field is not exist for response from manager!");
-            return null;
-        }
         int load = ConfigConstants.LOAD_THRESHOLD;
         if (jsonRes.has("load")) {
             int inLoad = jsonRes.get("load").getAsInt();
             load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
         }
-        int switchStat = Integer.parseInt(jsonRes.get("switch").getAsString());
         ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
         proxyEntry.setBid(clientConfig.getBid());
-        proxyEntry.setInterVisit(isInterVisit);
+        proxyEntry.setInterVisit(true);
         proxyEntry.setHostMap(hostMap);
-        proxyEntry.setSwitchStat(switchStat);
+        proxyEntry.setSwitchStat(0);
         Map<String, Integer> tidMap = getTidMap(jsonRes);
         proxyEntry.setBidNumAndTidNumMap(bidNum, tidMap);
         proxyEntry.setLoad(load);
@@ -726,7 +702,7 @@ public class ProxyConfigManager extends Thread {
 
     private Map<String, HostInfo> formatHostInfoMap(JsonObject jsonRes) {
         Map<String, HostInfo> hostMap = new HashMap<String, HostInfo>();
-        JsonArray jsonHostList = jsonRes.getAsJsonArray("address");
+        JsonArray jsonHostList = jsonRes.getAsJsonArray("data");
         if (jsonHostList == null) {
             logger.info("Parse proxyList failure: address field is not exist for response from manager!");
             return null;
@@ -745,12 +721,12 @@ public class ProxyConfigManager extends Thread {
                             + i + ") for response from manager!");
                     return null;
                 }
-                if (!jsonItem.has("host")) {
+                if (!jsonItem.has("ip")) {
                     logger.error("Parse proxyList failure: host field is not exist in address("
                             + i + ") for response from manager!");
                     return null;
                 }
-                String hostItem = jsonItem.get("host").getAsString();
+                String hostItem = jsonItem.get("ip").getAsString();
                 if (Utils.isBlank(hostItem)) {
                     logger.error("Parse proxyList failure: host value is blank in address("
                             + i + ") for response from manager!");
@@ -848,7 +824,8 @@ public class ProxyConfigManager extends Thread {
                             Utils.getAuthorizenInfo(clientConfig.getUserName(),
                                     clientConfig.getSecretKey(), timestamp, nonce));
                 }
-                UrlEncodedFormEntity se = new UrlEncodedFormEntity(params);
+
+                StringEntity se = getEntity(params);
                 httpPost.setEntity(se);
                 HttpResponse response = httpClient.execute(httpPost);
                 returnStr = EntityUtils.toString(response.getEntity());
@@ -883,6 +860,17 @@ public class ProxyConfigManager extends Thread {
         }
     }
 
+    private StringEntity getEntity(List<BasicNameValuePair> params)
+        throws UnsupportedEncodingException {
+        JsonObject jsonObject = new JsonObject();
+        for (BasicNameValuePair pair : params) {
+            jsonObject.addProperty(pair.getName(), pair.getValue());
+        }
+        StringEntity se = new StringEntity(jsonObject.toString());
+        se.setContentType(APPLICATION_JSON);
+        return se;
+    }
+
     private CloseableHttpClient getCloseableHttpClient(List<BasicNameValuePair> params)
         throws NoSuchAlgorithmException, KeyManagementException {
         CloseableHttpClient httpClient;