You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/04 04:13:34 UTC
[incubator-doris] branch master updated: [Revert] Revert
RestService.java (#6994)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d19a971 [Revert] Revert RestService.java (#6994)
d19a971 is described below
commit d19a9715821dad38cc58421532dbbdbc1efc36b5
Author: wei zhao <zh...@163.com>
AuthorDate: Thu Nov 4 12:13:18 2021 +0800
[Revert] Revert RestService.java (#6994)
---
.../org/apache/doris/spark/rest/RestService.java | 139 ++++++++++++++-------
.../apache/doris/spark/rest/TestRestService.java | 20 +++
2 files changed, 116 insertions(+), 43 deletions(-)
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index dce540c..bb91538 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -31,19 +31,27 @@ import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE
import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Base64;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
import java.util.Set;
+import java.util.HashSet;
import java.util.stream.Collectors;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
@@ -52,22 +60,17 @@ import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
-import org.apache.doris.spark.rest.models.*;
+import org.apache.doris.spark.rest.models.Backend;
+import org.apache.doris.spark.rest.models.BackendRow;
+import org.apache.doris.spark.rest.models.QueryPlan;
+import org.apache.doris.spark.rest.models.Schema;
+import org.apache.doris.spark.rest.models.Tablet;
import org.apache.http.HttpStatus;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -83,7 +86,8 @@ public class RestService implements Serializable {
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
- private static final String BACKENDS = "/api/show_proc?path=//backends";
+ private static final String BACKENDS = "/rest/v1/system?path=//backends";
+
/**
* send request to Doris FE and get response json string.
@@ -110,36 +114,37 @@ public class RestService implements Serializable {
.build();
request.setConfig(requestConfig);
-
String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
-
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(user, password));
- HttpClientContext context = HttpClientContext.create();
- context.setCredentialsProvider(credentialsProvider);
logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user);
-
IOException ex = null;
int statusCode = -1;
for (int attempt = 0; attempt < retries; attempt++) {
- CloseableHttpClient httpClient = HttpClients.createDefault();
logger.debug("Attempt {} to request {}.", attempt, request.getURI());
try {
- CloseableHttpResponse response = httpClient.execute(request, context);
- statusCode = response.getStatusLine().getStatusCode();
- if (statusCode != HttpStatus.SC_OK) {
+ String response;
+ if (request instanceof HttpGet){
+ response = getConnectionGet(request.getURI().toString(), user, password,logger);
+ } else {
+ response = getConnectionPost(request,user, password,logger);
+ }
+ if (response == null) {
logger.warn("Failed to get response from Doris FE {}, http code is {}",
request.getURI(), statusCode);
continue;
}
- String res = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
logger.trace("Success get response from Doris FE: {}, response is: {}.",
- request.getURI(), res);
- return res;
+ request.getURI(), response);
+ ObjectMapper mapper = new ObjectMapper();
+ Map map = mapper.readValue(response, Map.class);
+ //Handle the problem of inconsistent data format returned by http v1 and v2
+ if (map.containsKey("code") && map.containsKey("msg")) {
+ Object data = map.get("data");
+ return mapper.writeValueAsString(data);
+ } else {
+ return response;
+ }
} catch (IOException e) {
ex = e;
logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
@@ -150,6 +155,54 @@ public class RestService implements Serializable {
throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
}
+ private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
+ URL realUrl = new URL(request);
+ // open connection
+ HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
+ String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+ connection.setRequestProperty("Authorization", "Basic " + authEncoding);
+
+ connection.connect();
+ return parseResponse(connection,logger);
+ }
+
+ private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
+ if (connection.getResponseCode() != HttpStatus.SC_OK) {
+ logger.warn("Failed to get response from Doris {}, http code is {}",
+ connection.getURL(), connection.getResponseCode());
+ throw new IOException("Failed to get response from Doris");
+ }
+ String result = "";
+ BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
+ String line;
+ while ((line = in.readLine()) != null) {
+ result += line;
+ }
+ if (in != null) {
+ in.close();
+ }
+ return result;
+ }
+
+ private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
+ URL url = new URL(request.getURI().toString());
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setInstanceFollowRedirects(false);
+ conn.setRequestMethod(request.getMethod());
+ String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+ conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+ InputStream content = ((HttpPost)request).getEntity().getContent();
+ String res = IOUtils.toString(content);
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ PrintWriter out = new PrintWriter(conn.getOutputStream());
+ // send request params
+ out.print(res);
+ // flush
+ out.flush();
+ // read response
+ return parseResponse(conn,logger);
+ }
/**
* parse table identifier to array.
* @param tableIdentifier table identifier string
@@ -426,7 +479,6 @@ public class RestService implements Serializable {
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
- @VisibleForTesting
public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
String feNode = randomEndpoint(feNodes, logger);
@@ -446,12 +498,19 @@ public class RestService implements Serializable {
}
-
+ /**
+ * translate Doris FE response to inner {@link BackendRow} struct.
+ * @param response Doris FE response
+ * @param logger {@link Logger}
+ * @return inner {@link List<BackendRow>} struct
+ * @throws DorisException,IOException throw when translate failed
+ * */
+ @VisibleForTesting
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
- List<List<String>> backend;
+ Backend backend;
try {
- backend = mapper.readValue(response, List.class);
+ backend = mapper.readValue(response, Backend.class);
} catch (com.fasterxml.jackson.core.JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " + response;
logger.error(errMsg, e);
@@ -470,13 +529,7 @@ public class RestService implements Serializable {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
- List<BackendRow> backendRows = backend.stream().map(array -> {
- BackendRow backendRow = new BackendRow();
- backendRow.setIP(array.get(2));
- backendRow.setHttpPort(array.get(6));
- backendRow.setAlive(Boolean.parseBoolean(array.get(10)));
- return backendRow;
- }).filter(v -> v.getAlive()).collect(Collectors.toList());
+ List<BackendRow> backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList());
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}
@@ -494,7 +547,7 @@ public class RestService implements Serializable {
*/
@VisibleForTesting
static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets,
- String opaquedQueryPlan, String database, String table, Logger logger)
+ String opaquedQueryPlan, String database, String table, Logger logger)
throws IllegalArgumentException {
int tabletsSize = tabletCountLimitForOnePartition(cfg, logger);
List<PartitionDefinition> partitions = new ArrayList<>();
diff --git a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
index 95ae0c2..484be45 100644
--- a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
+++ b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
@@ -37,6 +37,7 @@ import org.apache.doris.spark.cfg.PropertiesSettings;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
+import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.Field;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
@@ -293,4 +294,23 @@ public class TestRestService {
Assert.assertEquals(expected, actual);
}
+
+ @Test
+ public void testParseBackend() throws Exception {
+ String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," +
+ "\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," +
+ "\"HttpPort\",\"BrpcPort\",\"LastStartTime\",\"LastHeartbeat\",\"Alive\",\"SystemDecommissioned\"," +
+ "\"ClusterDecommissioned\",\"TabletNum\",\"DataUsedCapacity\",\"AvailCapacity\",\"TotalCapacity\"," +
+ "\"UsedPct\",\"MaxDiskUsedPct\",\"Tag\",\"ErrMsg\",\"Version\",\"Status\"],\"rows\":[{\"HttpPort\":" +
+ "\"8040\",\"Status\":\"{\\\"lastSuccessReportTabletsTime\\\":\\\"N/A\\\",\\\"lastStreamLoadTime\\\":" +
+ "-1}\",\"SystemDecommissioned\":\"false\",\"LastHeartbeat\":\"\\\\N\",\"DataUsedCapacity\":\"0.000 " +
+ "\",\"ErrMsg\":\"\",\"IP\":\"127.0.0.1\",\"UsedPct\":\"0.00 %\",\"__hrefPaths\":[\"/rest/v1/system?" +
+ "path=//backends/10002\"],\"Cluster\":\"default_cluster\",\"Alive\":\"true\",\"MaxDiskUsedPct\":" +
+ "\"0.00 %\",\"BrpcPort\":\"-1\",\"BePort\":\"-1\",\"ClusterDecommissioned\":\"false\"," +
+ "\"AvailCapacity\":\"1.000 B\",\"Version\":\"\",\"BackendId\":\"10002\",\"HeartbeatPort\":\"9050\"," +
+ "\"LastStartTime\":\"\\\\N\",\"TabletNum\":\"0\",\"TotalCapacity\":\"0.000 \",\"Tag\":" +
+ "\"{\\\"location\\\" : \\\"default\\\"}\",\"HostName\":\"localhost\"}]}";
+ List<BackendRow> backendRows = RestService.parseBackend(response, logger);
+ Assert.assertTrue(backendRows != null && !backendRows.isEmpty());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org