You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/04 04:13:34 UTC

[incubator-doris] branch master updated: [Revert] Revert RestService.java (#6994)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d19a971  [Revert] Revert RestService.java (#6994)
d19a971 is described below

commit d19a9715821dad38cc58421532dbbdbc1efc36b5
Author: wei zhao <zh...@163.com>
AuthorDate: Thu Nov 4 12:13:18 2021 +0800

    [Revert] Revert RestService.java (#6994)
---
 .../org/apache/doris/spark/rest/RestService.java   | 139 ++++++++++++++-------
 .../apache/doris/spark/rest/TestRestService.java   |  20 +++
 2 files changed, 116 insertions(+), 43 deletions(-)

diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index dce540c..bb91538 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -31,19 +31,27 @@ import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE
 import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
 import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
 import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Base64;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
 import java.util.Set;
+import java.util.HashSet;
 import java.util.stream.Collectors;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.Settings;
@@ -52,22 +60,17 @@ import org.apache.doris.spark.exception.ConnectedFailedException;
 import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.IllegalArgumentException;
 import org.apache.doris.spark.exception.ShouldNeverHappenException;
-import org.apache.doris.spark.rest.models.*;
+import org.apache.doris.spark.rest.models.Backend;
+import org.apache.doris.spark.rest.models.BackendRow;
+import org.apache.doris.spark.rest.models.QueryPlan;
+import org.apache.doris.spark.rest.models.Schema;
+import org.apache.doris.spark.rest.models.Tablet;
 import org.apache.http.HttpStatus;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -83,7 +86,8 @@ public class RestService implements Serializable {
     private static final String API_PREFIX = "/api";
     private static final String SCHEMA = "_schema";
     private static final String QUERY_PLAN = "_query_plan";
-    private static final String BACKENDS = "/api/show_proc?path=//backends";
+    private static final String BACKENDS = "/rest/v1/system?path=//backends";
+
 
     /**
      * send request to Doris FE and get response json string.
@@ -110,36 +114,37 @@ public class RestService implements Serializable {
                 .build();
 
         request.setConfig(requestConfig);
-
         String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
         String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
-
-        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-        credentialsProvider.setCredentials(
-                AuthScope.ANY,
-                new UsernamePasswordCredentials(user, password));
-        HttpClientContext context = HttpClientContext.create();
-        context.setCredentialsProvider(credentialsProvider);
         logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user);
-
         IOException ex = null;
         int statusCode = -1;
 
         for (int attempt = 0; attempt < retries; attempt++) {
-            CloseableHttpClient httpClient = HttpClients.createDefault();
             logger.debug("Attempt {} to request {}.", attempt, request.getURI());
             try {
-                CloseableHttpResponse response = httpClient.execute(request, context);
-                statusCode = response.getStatusLine().getStatusCode();
-                if (statusCode != HttpStatus.SC_OK) {
+                String response;
+                if (request instanceof HttpGet){
+                    response = getConnectionGet(request.getURI().toString(), user, password,logger);
+                } else {
+                    response = getConnectionPost(request,user, password,logger);
+                }
+                if (response == null) {
                     logger.warn("Failed to get response from Doris FE {}, http code is {}",
                             request.getURI(), statusCode);
                     continue;
                 }
-                String res = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
                 logger.trace("Success get response from Doris FE: {}, response is: {}.",
-                        request.getURI(), res);
-                return res;
+                        request.getURI(), response);
+                ObjectMapper mapper = new ObjectMapper();
+                Map map = mapper.readValue(response, Map.class);
+                //Handle the problem of inconsistent data format returned by http v1 and v2
+                if (map.containsKey("code") && map.containsKey("msg")) {
+                    Object data = map.get("data");
+                    return mapper.writeValueAsString(data);
+                } else {
+                    return response;
+                }
             } catch (IOException e) {
                 ex = e;
                 logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
@@ -150,6 +155,54 @@ public class RestService implements Serializable {
         throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
     }
 
+    private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
+        URL realUrl = new URL(request);
+        // open connection
+        HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
+        String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+        connection.setRequestProperty("Authorization", "Basic " + authEncoding);
+
+        connection.connect();
+        return parseResponse(connection,logger);
+    }
+
+    private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
+        if (connection.getResponseCode() != HttpStatus.SC_OK) {
+            logger.warn("Failed to get response from Doris  {}, http code is {}",
+                    connection.getURL(), connection.getResponseCode());
+            throw new IOException("Failed to get response from Doris");
+        }
+        String result = "";
+        BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
+        String line;
+        while ((line = in.readLine()) != null) {
+            result += line;
+        }
+        if (in != null) {
+            in.close();
+        }
+        return result;
+    }
+
+    private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
+        URL url = new URL(request.getURI().toString());
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setInstanceFollowRedirects(false);
+        conn.setRequestMethod(request.getMethod());
+        String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+        InputStream content = ((HttpPost)request).getEntity().getContent();
+        String res = IOUtils.toString(content);
+        conn.setDoOutput(true);
+        conn.setDoInput(true);
+        PrintWriter out = new PrintWriter(conn.getOutputStream());
+        // send request params
+        out.print(res);
+        // flush
+        out.flush();
+        // read response
+        return parseResponse(conn,logger);
+    }
     /**
      * parse table identifier to array.
      * @param tableIdentifier table identifier string
@@ -426,7 +479,6 @@ public class RestService implements Serializable {
      * @return the chosen one Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
      */
-    @VisibleForTesting
     public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
         String feNodes = sparkSettings.getProperty(DORIS_FENODES);
         String feNode = randomEndpoint(feNodes, logger);
@@ -446,12 +498,19 @@ public class RestService implements Serializable {
     }
 
 
-
+    /**
+     * translate Doris FE response to inner {@link BackendRow} struct.
+     * @param response Doris FE response
+     * @param logger {@link Logger}
+     * @return inner {@link List<BackendRow>} struct
+     * @throws DorisException,IOException throw when translate failed
+     * */
+    @VisibleForTesting
     static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
         com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
-        List<List<String>> backend;
+        Backend backend;
         try {
-            backend = mapper.readValue(response, List.class);
+            backend = mapper.readValue(response, Backend.class);
         } catch (com.fasterxml.jackson.core.JsonParseException e) {
             String errMsg = "Doris BE's response is not a json. res: " + response;
             logger.error(errMsg, e);
@@ -470,13 +529,7 @@ public class RestService implements Serializable {
             logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
             throw new ShouldNeverHappenException();
         }
-        List<BackendRow> backendRows = backend.stream().map(array -> {
-            BackendRow backendRow = new BackendRow();
-            backendRow.setIP(array.get(2));
-            backendRow.setHttpPort(array.get(6));
-            backendRow.setAlive(Boolean.parseBoolean(array.get(10)));
-            return backendRow;
-        }).filter(v -> v.getAlive()).collect(Collectors.toList());
+        List<BackendRow> backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList());
         logger.debug("Parsing schema result is '{}'.", backendRows);
         return backendRows;
     }
@@ -494,7 +547,7 @@ public class RestService implements Serializable {
      */
     @VisibleForTesting
     static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets,
-                                                           String opaquedQueryPlan, String database, String table, Logger logger)
+            String opaquedQueryPlan, String database, String table, Logger logger)
             throws IllegalArgumentException {
         int tabletsSize = tabletCountLimitForOnePartition(cfg, logger);
         List<PartitionDefinition> partitions = new ArrayList<>();
diff --git a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
index 95ae0c2..484be45 100644
--- a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
+++ b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
@@ -37,6 +37,7 @@ import org.apache.doris.spark.cfg.PropertiesSettings;
 import org.apache.doris.spark.cfg.Settings;
 import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.IllegalArgumentException;
+import org.apache.doris.spark.rest.models.BackendRow;
 import org.apache.doris.spark.rest.models.Field;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
@@ -293,4 +294,23 @@ public class TestRestService {
 
         Assert.assertEquals(expected, actual);
     }
+
+    @Test
+    public void testParseBackend() throws Exception {
+        String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," +
+                "\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," +
+                "\"HttpPort\",\"BrpcPort\",\"LastStartTime\",\"LastHeartbeat\",\"Alive\",\"SystemDecommissioned\"," +
+                "\"ClusterDecommissioned\",\"TabletNum\",\"DataUsedCapacity\",\"AvailCapacity\",\"TotalCapacity\"," +
+                "\"UsedPct\",\"MaxDiskUsedPct\",\"Tag\",\"ErrMsg\",\"Version\",\"Status\"],\"rows\":[{\"HttpPort\":" +
+                "\"8040\",\"Status\":\"{\\\"lastSuccessReportTabletsTime\\\":\\\"N/A\\\",\\\"lastStreamLoadTime\\\":" +
+                "-1}\",\"SystemDecommissioned\":\"false\",\"LastHeartbeat\":\"\\\\N\",\"DataUsedCapacity\":\"0.000 " +
+                "\",\"ErrMsg\":\"\",\"IP\":\"127.0.0.1\",\"UsedPct\":\"0.00 %\",\"__hrefPaths\":[\"/rest/v1/system?" +
+                "path=//backends/10002\"],\"Cluster\":\"default_cluster\",\"Alive\":\"true\",\"MaxDiskUsedPct\":" +
+                "\"0.00 %\",\"BrpcPort\":\"-1\",\"BePort\":\"-1\",\"ClusterDecommissioned\":\"false\"," +
+                "\"AvailCapacity\":\"1.000 B\",\"Version\":\"\",\"BackendId\":\"10002\",\"HeartbeatPort\":\"9050\"," +
+                "\"LastStartTime\":\"\\\\N\",\"TabletNum\":\"0\",\"TotalCapacity\":\"0.000 \",\"Tag\":" +
+                "\"{\\\"location\\\" : \\\"default\\\"}\",\"HostName\":\"localhost\"}]}";
+        List<BackendRow> backendRows = RestService.parseBackend(response, logger);
+        Assert.assertTrue(backendRows != null && !backendRows.isEmpty());
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org