You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/11/05 01:43:26 UTC
[incubator-doris] branch master updated: [HTTP][API] Add backends
info API for spark/flink connector (#6984)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 29838f0 [HTTP][API] Add backends info API for spark/flink connector (#6984)
29838f0 is described below
commit 29838f07daadb62ad74db45156ff0dbda3583b4b
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Nov 5 09:43:06 2021 +0800
[HTTP][API] Add backends info API for spark/flink connector (#6984)
Doris should provide a http api to return backends list for connectors to submit stream load,
and without privilege checking, which can let common user to use it
---
docs/.vuepress/sidebar/en.js | 1 +
docs/.vuepress/sidebar/zh-CN.js | 1 +
.../http-actions/fe/backends-action.md | 70 +++++++++++++
.../http-actions/fe/backends-action.md | 70 +++++++++++++
.../org/apache/doris/flink/rest/RestService.java | 64 +++++++++++-
.../apache/doris/flink/rest/models/Backend.java | 1 +
.../apache/doris/flink/rest/models/BackendRow.java | 1 +
.../rest/models/{Backend.java => BackendV2.java} | 47 +++++++--
extension/spark-doris-connector/build.sh | 20 +++-
extension/spark-doris-connector/pom_3.0.xml | 6 ++
.../org/apache/doris/spark/DorisStreamLoad.java | 2 +-
.../org/apache/doris/spark/rest/RestService.java | 68 ++++++++++++-
.../apache/doris/spark/rest/models/Backend.java | 1 +
.../apache/doris/spark/rest/models/BackendRow.java | 3 +
.../rest/models/{Backend.java => BackendV2.java} | 47 +++++++--
.../apache/doris/spark/rest/TestRestService.java | 13 ++-
.../doris/spark/sql/TestSparkConnector.scala | 5 +
.../apache/doris/httpv2/rest/BackendsAction.java | 112 +++++++++++++++++++++
18 files changed, 504 insertions(+), 28 deletions(-)
diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index 39be10b..d9c2d53 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -98,6 +98,7 @@ module.exports = [
"query-profile-action",
],
},
+ "backends-action",
"bootstrap-action",
"cancel-load-action",
"check-decommission-action",
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index 837c04e..847784b 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -97,6 +97,7 @@ module.exports = [
"query-profile-action",
],
},
+ "backends-action",
"bootstrap-action",
"cancel-load-action",
"check-decommission-action",
diff --git a/docs/en/administrator-guide/http-actions/fe/backends-action.md b/docs/en/administrator-guide/http-actions/fe/backends-action.md
new file mode 100644
index 0000000..17589dd
--- /dev/null
+++ b/docs/en/administrator-guide/http-actions/fe/backends-action.md
@@ -0,0 +1,70 @@
+---
+{
+ "title": "Backends Action",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Backends Action
+
+## Request
+
+```
+GET /api/backends
+```
+
+## Description
+
+Backends Action returns the Backends list, including Backend's IP, PORT and other information.
+
+## Path parameters
+
+None
+
+## Query parameters
+
+* `is_alive`
+
+ Optional parameters. Whether to return the surviving BE nodes. The default is false, which means that all BE nodes are returned.
+
+## Request body
+
+None
+
+## Response
+
+```
+{
+ "msg": "success",
+ "code": 0,
+ "data": {
+ "backends": [
+ {
+ "ip": "192.1.1.1",
+ "http_port": 8040,
+ "is_alive": true
+ }
+ ]
+ },
+ "count": 0
+}
+```
diff --git a/docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md b/docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md
new file mode 100644
index 0000000..3c76675
--- /dev/null
+++ b/docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md
@@ -0,0 +1,70 @@
+---
+{
+ "title": "Backends Action",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Backends Action
+
+## Request
+
+```
+GET /api/backends
+```
+
+## Description
+
+Backends Action 返回 Backends 列表,包括 Backend 的 IP、PORT 等信息。
+
+## Path parameters
+
+无
+
+## Query parameters
+
+* `is_alive`
+
+ 可选参数。是否返回存活的 BE 节点。默认为false,即返回所有 BE 节点。
+
+## Request body
+
+无
+
+## Response
+
+```
+{
+ "msg": "success",
+ "code": 0,
+ "data": {
+ "backends": [
+ {
+ "ip": "192.1.1.1",
+ "http_port": 8040,
+ "is_alive": true
+ }
+ ]
+ },
+ "count": 0
+}
+```
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 184afd3..1e6310c 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -32,6 +32,7 @@ import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.ShouldNeverHappenException;
import org.apache.doris.flink.rest.models.Backend;
import org.apache.doris.flink.rest.models.BackendRow;
+import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.QueryPlan;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.rest.models.Tablet;
@@ -83,7 +84,9 @@ public class RestService implements Serializable {
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
+ @Deprecated
private static final String BACKENDS = "/rest/v1/system?path=//backends";
+ private static final String BACKENDS_V2 = "/api/backends?is_aliva=true";
private static final String FE_LOGIN = "/rest/v1/login";
/**
@@ -250,25 +253,29 @@ public class RestService implements Serializable {
*/
@VisibleForTesting
public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
- List<BackendRow> backends = getBackends(options, readOptions, logger);
+ List<BackendV2.BackendRowV2> backends = getBackendsV2(options, readOptions, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
}
Collections.shuffle(backends);
- BackendRow backend = backends.get(0);
- return backend.getIP() + ":" + backend.getHttpPort();
+ BackendV2.BackendRowV2 backend = backends.get(0);
+ return backend.getIp() + ":" + backend.getHttpPort();
}
/**
- * get Doris BE nodes to request.
+ * get Doris BE nodes to request.
*
* @param options configuration of request
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
+ *
+ * This method is deprecated. Because it needs ADMIN_PRIV to get backends, which is not suitable for common users.
+ * Use getBackendsV2 instead
*/
+ @Deprecated
@VisibleForTesting
static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
String feNodes = options.getFenodes();
@@ -281,6 +288,7 @@ public class RestService implements Serializable {
return backends;
}
+ @Deprecated
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
ObjectMapper mapper = new ObjectMapper();
Backend backend;
@@ -310,6 +318,54 @@ public class RestService implements Serializable {
}
/**
+ * get Doris BE nodes to request.
+ *
+ * @param options configuration of request
+ * @param logger slf4j logger
+ * @return the chosen one Doris BE node
+ * @throws IllegalArgumentException BE nodes is illegal
+ */
+ @VisibleForTesting
+ static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
+ String feNodes = options.getFenodes();
+ String feNode = randomEndpoint(feNodes, logger);
+ String beUrl = "http://" + feNode + BACKENDS_V2;
+ HttpGet httpGet = new HttpGet(beUrl);
+ String response = send(options, readOptions, httpGet, logger);
+ logger.info("Backend Info:{}", response);
+ List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
+ return backends;
+ }
+
+ static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException, IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ BackendV2 backend;
+ try {
+ backend = mapper.readValue(response, BackendV2.class);
+ } catch (JsonParseException e) {
+ String errMsg = "Doris BE's response is not a json. res: " + response;
+ logger.error(errMsg, e);
+ throw new DorisException(errMsg, e);
+ } catch (JsonMappingException e) {
+ String errMsg = "Doris BE's response cannot map to schema. res: " + response;
+ logger.error(errMsg, e);
+ throw new DorisException(errMsg, e);
+ } catch (IOException e) {
+ String errMsg = "Parse Doris BE's response to json failed. res: " + response;
+ logger.error(errMsg, e);
+ throw new DorisException(errMsg, e);
+ }
+
+ if (backend == null) {
+ logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+ throw new ShouldNeverHappenException();
+ }
+ List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+ logger.debug("Parsing schema result is '{}'.", backendRows);
+ return backendRows;
+ }
+
+ /**
* get a valid URI to connect Doris FE.
*
* @param options configuration of request
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
index d74e46f..d91614f 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
@@ -25,6 +25,7 @@ import java.util.List;
/**
* Be response model
**/
+@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class Backend {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
index 5b7df99..3dd0471 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.rest.models;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
+@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendRow {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
similarity index 52%
copy from extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
copy to extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
index d74e46f..5efb85e 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
@@ -26,16 +26,49 @@ import java.util.List;
* Be response model
**/
@JsonIgnoreProperties(ignoreUnknown = true)
-public class Backend {
+public class BackendV2 {
- @JsonProperty(value = "rows")
- private List<BackendRow> rows;
+ @JsonProperty(value = "backends")
+ private List<BackendRowV2> backends;
- public List<BackendRow> getRows() {
- return rows;
+ public List<BackendRowV2> getBackends() {
+ return backends;
}
- public void setRows(List<BackendRow> rows) {
- this.rows = rows;
+ public void setBackends(List<BackendRowV2> backends) {
+ this.backends = backends;
+ }
+
+ public static class BackendRowV2 {
+ @JsonProperty("ip")
+ public String ip;
+ @JsonProperty("http_port")
+ public int httpPort;
+ @JsonProperty("is_alive")
+ public boolean isAlive;
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public int getHttpPort() {
+ return httpPort;
+ }
+
+ public void setHttpPort(int httpPort) {
+ this.httpPort = httpPort;
+ }
+
+ public boolean isAlive() {
+ return isAlive;
+ }
+
+ public void setAlive(boolean alive) {
+ isAlive = alive;
+ }
}
}
diff --git a/extension/spark-doris-connector/build.sh b/extension/spark-doris-connector/build.sh
index b4ea042..d6ba435 100755
--- a/extension/spark-doris-connector/build.sh
+++ b/extension/spark-doris-connector/build.sh
@@ -39,7 +39,6 @@ fi
# check maven
MVN_CMD=mvn
-
if [[ ! -z ${CUSTOM_MVN} ]]; then
MVN_CMD=${CUSTOM_MVN}
fi
@@ -48,11 +47,26 @@ if ! ${MVN_CMD} --version; then
exit 1
fi
export MVN_CMD
-if [ $1 == 3 ]
+
+usage() {
+ echo "
+ Eg.
+ $0 2 build with spark 2.x
+ $0 3 build with spark 3.x
+ "
+ exit 1
+}
+
+if [ $# == 0 ]; then
+ usage
+fi
+
+
+if [ "$1"x == "3x" ]
then
${MVN_CMD} clean package -f pom_3.0.xml
fi
-if [ $1 == 2 ]
+if [ "$1"x == "2x" ]
then
${MVN_CMD} clean package
fi
diff --git a/extension/spark-doris-connector/pom_3.0.xml b/extension/spark-doris-connector/pom_3.0.xml
index 6c8eee5..d208ad0 100644
--- a/extension/spark-doris-connector/pom_3.0.xml
+++ b/extension/spark-doris-connector/pom_3.0.xml
@@ -161,6 +161,12 @@
<version>4.1.27.Final</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 117825c..4411fbc 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -75,7 +75,7 @@ public class DorisStreamLoad implements Serializable{
}
public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
- String hostPort = RestService.randomBackend(settings, LOG);
+ String hostPort = RestService.randomBackendV2(settings, LOG);
this.hostPort = hostPort;
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index bb91538..e8a2956 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -62,6 +62,7 @@ import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.rest.models.Backend;
import org.apache.doris.spark.rest.models.BackendRow;
+import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
@@ -86,8 +87,9 @@ public class RestService implements Serializable {
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
+ @Deprecated
private static final String BACKENDS = "/rest/v1/system?path=//backends";
-
+ private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
/**
* send request to Doris FE and get response json string.
@@ -478,14 +480,17 @@ public class RestService implements Serializable {
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
+ * Deprecated, use randomBackendV2 instead
*/
+ @Deprecated
+ @VisibleForTesting
public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
String feNode = randomEndpoint(feNodes, logger);
- String beUrl = String.format("http://%s" + BACKENDS,feNode);
+ String beUrl = String.format("http://%s" + BACKENDS, feNode);
HttpGet httpGet = new HttpGet(beUrl);
- String response = send(sparkSettings,httpGet, logger);
- logger.info("Backend Info:{}",response);
+ String response = send(sparkSettings, httpGet, logger);
+ logger.info("Backend Info:{}", response);
List<BackendRow> backends = parseBackend(response, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
@@ -497,7 +502,6 @@ public class RestService implements Serializable {
return backend.getIP() + ":" + backend.getHttpPort();
}
-
/**
* translate Doris FE response to inner {@link BackendRow} struct.
* @param response Doris FE response
@@ -505,6 +509,7 @@ public class RestService implements Serializable {
* @return inner {@link List<BackendRow>} struct
* @throws DorisException,IOException throw when translate failed
* */
+ @Deprecated
@VisibleForTesting
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
@@ -535,6 +540,59 @@ public class RestService implements Serializable {
}
/**
+ * choice a Doris BE node to request.
+ * @param logger slf4j logger
+ * @return the chosen one Doris BE node
+ * @throws IllegalArgumentException BE nodes is illegal
+ */
+ @VisibleForTesting
+ public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
+ String feNodes = sparkSettings.getProperty(DORIS_FENODES);
+ String feNode = randomEndpoint(feNodes, logger);
+ String beUrl = String.format("http://%s" + BACKENDS_V2, feNode);
+ HttpGet httpGet = new HttpGet(beUrl);
+ String response = send(sparkSettings, httpGet, logger);
+ logger.info("Backend Info:{}", response);
+ List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
+ logger.trace("Parse beNodes '{}'.", backends);
+ if (backends == null || backends.isEmpty()) {
+ logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
+ throw new IllegalArgumentException("beNodes", String.valueOf(backends));
+ }
+ Collections.shuffle(backends);
+ BackendV2.BackendRowV2 backend = backends.get(0);
+ return backend.getIp() + ":" + backend.getHttpPort();
+ }
+
+ static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException {
+ com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
+ BackendV2 backend;
+ try {
+ backend = mapper.readValue(response, BackendV2.class);
+ } catch (com.fasterxml.jackson.core.JsonParseException e) {
+ String errMsg = "Doris BE's response is not a json. res: " + response;
+ logger.error(errMsg, e);
+ throw new DorisException(errMsg, e);
+ } catch (com.fasterxml.jackson.databind.JsonMappingException e) {
+ String errMsg = "Doris BE's response cannot map to schema. res: " + response;
+ logger.error(errMsg, e);
+ throw new DorisException(errMsg, e);
+ } catch (IOException e) {
+ String errMsg = "Parse Doris BE's response to json failed. res: " + response;
+ logger.error(errMsg, e);
+ throw new DorisException(errMsg, e);
+ }
+
+ if (backend == null) {
+ logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+ throw new ShouldNeverHappenException();
+ }
+ List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+ logger.debug("Parsing schema result is '{}'.", backendRows);
+ return backendRows;
+ }
+
+ /**
* translate BE tablets map to Doris RDD partition.
* @param cfg configuration of request
* @param be2Tablets BE to tablets {@link Map}
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
index 122e71c..322202d 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
@@ -23,6 +23,7 @@ import java.util.List;
/**
* Be response model
**/
+@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class Backend {
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java
index 0e2b385..a84ad2c 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java
@@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.spark.rest.models;
+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
+
+@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendRow {
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java
similarity index 52%
copy from extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
copy to extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java
index 122e71c..75a2514 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java
@@ -24,16 +24,49 @@ import java.util.List;
* Be response model
**/
@JsonIgnoreProperties(ignoreUnknown = true)
-public class Backend {
+public class BackendV2 {
- @JsonProperty(value = "rows")
- private List<BackendRow> rows;
+ @JsonProperty(value = "backends")
+ private List<BackendRowV2> backends;
- public List<BackendRow> getRows() {
- return rows;
+ public List<BackendRowV2> getBackends() {
+ return backends;
}
- public void setRows(List<BackendRow> rows) {
- this.rows = rows;
+ public void setRows(List<BackendRowV2> rows) {
+ this.backends = rows;
+ }
+
+ public static class BackendRowV2 {
+ @JsonProperty("ip")
+ public String ip;
+ @JsonProperty("http_port")
+ public int httpPort;
+ @JsonProperty("is_alive")
+ public boolean isAlive;
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public int getHttpPort() {
+ return httpPort;
+ }
+
+ public void setHttpPort(int httpPort) {
+ this.httpPort = httpPort;
+ }
+
+ public boolean isAlive() {
+ return isAlive;
+ }
+
+ public void setAlive(boolean alive) {
+ isAlive = alive;
+ }
}
}
diff --git a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
index 484be45..22d542a 100644
--- a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
+++ b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
@@ -38,6 +38,7 @@ import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.rest.models.BackendRow;
+import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.Field;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
@@ -49,6 +50,8 @@ import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import jdk.nashorn.internal.ir.annotations.Ignore;
+
public class TestRestService {
private static Logger logger = LoggerFactory.getLogger(TestRestService.class);
@@ -295,7 +298,8 @@ public class TestRestService {
Assert.assertEquals(expected, actual);
}
- @Test
+ @Deprecated
+ @Ignore
public void testParseBackend() throws Exception {
String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," +
"\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," +
@@ -313,4 +317,11 @@ public class TestRestService {
List<BackendRow> backendRows = RestService.parseBackend(response, logger);
Assert.assertTrue(backendRows != null && !backendRows.isEmpty());
}
+
+ @Test
+ public void testParseBackendV2() throws Exception {
+ String response = "{\"backends\":[{\"ip\":\"192.168.1.1\",\"http_port\":8042,\"is_alive\":true}, {\"ip\":\"192.168.1.2\",\"http_port\":8042,\"is_alive\":true}]}";
+ List<BackendV2.BackendRowV2> backendRows = RestService.parseBackendV2(response, logger);
+ Assert.assertEquals(2, backendRows.size());
+ }
}
diff --git a/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
index e0d39af..be54aa9 100644
--- a/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
+++ b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -19,8 +19,12 @@ package org.apache.doris.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Ignore;
import org.junit.Test
+// This test need real connect info to run.
+// Set the connect info before comment out this @Ignore
+@Ignore
class TestSparkConnector {
val dorisFeNodes = "your_fe_host:8030"
val dorisUser = "root"
@@ -107,3 +111,4 @@ class TestSparkConnector {
.start().awaitTermination()
}
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java
new file mode 100644
index 0000000..4d88c52
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.system.Backend;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * This class responsible for returning current backends info.
+ * Mainly used for flink/spark connector, which need backends info to execute stream load.
+ * It only requires password, no auth check.
+ * <p>
+ * Response:
+ * <p>
+ * {
+ * "msg": "success",
+ * "code": 0,
+ * "data": {
+ * "backends": [
+ * {
+ * "ip": "192.1.1.1",
+ * "http_port": 8040,
+ * "is_alive": true
+ * }
+ * ]
+ * },
+ * "count": 0
+ * }
+ */
+@RestController
+public class BackendsAction extends RestBaseController {
+ public static final Logger LOG = LogManager.getLogger(BackendsAction.class);
+
+ private static final String IS_ALIVE = "is_alive";
+
+ @RequestMapping(path = "/api/backends", method = {RequestMethod.GET})
+ public Object getBackends(HttpServletRequest request, HttpServletResponse response) {
+ executeCheckPassword(request, response);
+
+ boolean needAlive = false;
+ String isAlive = request.getParameter(IS_ALIVE);
+ if (!Strings.isNullOrEmpty(isAlive) && isAlive.equalsIgnoreCase("true")) {
+ needAlive = true;
+ }
+
+ BackendInfo backendInfo = new BackendInfo();
+ backendInfo.backends = Lists.newArrayList();
+ List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(needAlive);
+ for (Long beId : beIds) {
+ Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
+ if (be != null) {
+ BackendRow backendRow = new BackendRow();
+ backendRow.ip = be.getHost();
+ backendRow.httpPort = be.getHttpPort();
+ backendRow.isAlive = be.isAlive();
+ backendInfo.backends.add(backendRow);
+ }
+ }
+ return ResponseEntityBuilder.ok(backendInfo);
+ }
+
+ @Getter
+ @Setter
+ public static class BackendInfo {
+ @JsonProperty("backends")
+ public List<BackendRow> backends;
+ }
+
+ @Getter
+ @Setter
+ public static class BackendRow {
+ @JsonProperty("ip")
+ public String ip;
+ @JsonProperty("http_port")
+ public int httpPort;
+ @JsonProperty("is_alive")
+ public boolean isAlive;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org