You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/11/05 01:43:26 UTC

[incubator-doris] branch master updated: [HTTP][API] Add backends info API for spark/flink connector (#6984)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 29838f0  [HTTP][API] Add backends info API for spark/flink connector (#6984)
29838f0 is described below

commit 29838f07daadb62ad74db45156ff0dbda3583b4b
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Nov 5 09:43:06 2021 +0800

    [HTTP][API] Add backends info API for spark/flink connector (#6984)
    
    Doris should provide a http api to return backends list for connectors to submit stream load,
    and without privilege checking, which can let common user to use it
---
 docs/.vuepress/sidebar/en.js                       |   1 +
 docs/.vuepress/sidebar/zh-CN.js                    |   1 +
 .../http-actions/fe/backends-action.md             |  70 +++++++++++++
 .../http-actions/fe/backends-action.md             |  70 +++++++++++++
 .../org/apache/doris/flink/rest/RestService.java   |  64 +++++++++++-
 .../apache/doris/flink/rest/models/Backend.java    |   1 +
 .../apache/doris/flink/rest/models/BackendRow.java |   1 +
 .../rest/models/{Backend.java => BackendV2.java}   |  47 +++++++--
 extension/spark-doris-connector/build.sh           |  20 +++-
 extension/spark-doris-connector/pom_3.0.xml        |   6 ++
 .../org/apache/doris/spark/DorisStreamLoad.java    |   2 +-
 .../org/apache/doris/spark/rest/RestService.java   |  68 ++++++++++++-
 .../apache/doris/spark/rest/models/Backend.java    |   1 +
 .../apache/doris/spark/rest/models/BackendRow.java |   3 +
 .../rest/models/{Backend.java => BackendV2.java}   |  47 +++++++--
 .../apache/doris/spark/rest/TestRestService.java   |  13 ++-
 .../doris/spark/sql/TestSparkConnector.scala       |   5 +
 .../apache/doris/httpv2/rest/BackendsAction.java   | 112 +++++++++++++++++++++
 18 files changed, 504 insertions(+), 28 deletions(-)

diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index 39be10b..d9c2d53 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -98,6 +98,7 @@ module.exports = [
                     "query-profile-action",
                   ],
                 },
+                "backends-action",
                 "bootstrap-action",
                 "cancel-load-action",
                 "check-decommission-action",
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index 837c04e..847784b 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -97,6 +97,7 @@ module.exports = [
                     "query-profile-action",
                   ],
                 },
+                "backends-action",
                 "bootstrap-action",
                 "cancel-load-action",
                 "check-decommission-action",
diff --git a/docs/en/administrator-guide/http-actions/fe/backends-action.md b/docs/en/administrator-guide/http-actions/fe/backends-action.md
new file mode 100644
index 0000000..17589dd
--- /dev/null
+++ b/docs/en/administrator-guide/http-actions/fe/backends-action.md
@@ -0,0 +1,70 @@
+---
+{
+    "title": "Backends Action",
+    "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Backends Action
+
+## Request
+
+```
+GET /api/backends
+```
+
+## Description
+
+Backends Action returns the Backends list, including Backend's IP, PORT and other information.
+
+## Path parameters
+
+None
+
+## Query parameters
+
+* `is_alive`
+
+    Optional parameters. Whether to return the surviving BE nodes. The default is false, which means that all BE nodes are returned.
+
+## Request body
+
+None
+
+## Response
+
+```
+{
+    "msg": "success",
+    "code": 0,
+    "data": {
+        "backends": [
+            {
+                "ip": "192.1.1.1",
+                "http_port": 8040,
+                "is_alive": true
+            }
+        ]
+    },
+    "count": 0
+}
+```
diff --git a/docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md b/docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md
new file mode 100644
index 0000000..3c76675
--- /dev/null
+++ b/docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md
@@ -0,0 +1,70 @@
+---
+{
+    "title": "Backends Action",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Backends Action
+
+## Request
+
+```
+GET /api/backends
+```
+
+## Description
+
+Backends Action 返回 Backends 列表,包括 Backend 的 IP、PORT 等信息。
+    
+## Path parameters
+
+无
+
+## Query parameters
+
+* `is_alive`
+
+    可选参数。是否返回存活的 BE 节点。默认为false,即返回所有 BE 节点。
+
+## Request body
+
+无
+
+## Response
+    
+```
+{
+    "msg": "success", 
+    "code": 0, 
+    "data": {
+        "backends": [
+            {
+                "ip": "192.1.1.1",
+                "http_port": 8040, 
+                "is_alive": true
+            }
+        ]
+    }, 
+    "count": 0
+}
+```
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 184afd3..1e6310c 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -32,6 +32,7 @@ import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.ShouldNeverHappenException;
 import org.apache.doris.flink.rest.models.Backend;
 import org.apache.doris.flink.rest.models.BackendRow;
+import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.rest.models.QueryPlan;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.rest.models.Tablet;
@@ -83,7 +84,9 @@ public class RestService implements Serializable {
     private static final String API_PREFIX = "/api";
     private static final String SCHEMA = "_schema";
     private static final String QUERY_PLAN = "_query_plan";
+    @Deprecated
     private static final String BACKENDS = "/rest/v1/system?path=//backends";
+    private static final String BACKENDS_V2 = "/api/backends?is_aliva=true";
     private static final String FE_LOGIN = "/rest/v1/login";
 
     /**
@@ -250,25 +253,29 @@ public class RestService implements Serializable {
      */
     @VisibleForTesting
     public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
-        List<BackendRow> backends = getBackends(options, readOptions, logger);
+        List<BackendV2.BackendRowV2> backends = getBackendsV2(options, readOptions, logger);
         logger.trace("Parse beNodes '{}'.", backends);
         if (backends == null || backends.isEmpty()) {
             logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
             throw new IllegalArgumentException("beNodes", String.valueOf(backends));
         }
         Collections.shuffle(backends);
-        BackendRow backend = backends.get(0);
-        return backend.getIP() + ":" + backend.getHttpPort();
+        BackendV2.BackendRowV2 backend = backends.get(0);
+        return backend.getIp() + ":" + backend.getHttpPort();
     }
 
     /**
-     * get  Doris BE nodes to request.
+     * get Doris BE nodes to request.
      *
      * @param options configuration of request
      * @param logger  slf4j logger
      * @return the chosen one Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
+     *
+     * This method is deprecated. Because it needs ADMIN_PRIV to get backends, which is not suitable for common users.
+     * Use getBackendsV2 instead
      */
+    @Deprecated
     @VisibleForTesting
     static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
         String feNodes = options.getFenodes();
@@ -281,6 +288,7 @@ public class RestService implements Serializable {
         return backends;
     }
 
+    @Deprecated
     static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
         ObjectMapper mapper = new ObjectMapper();
         Backend backend;
@@ -310,6 +318,54 @@ public class RestService implements Serializable {
     }
 
     /**
+     * get Doris BE nodes to request.
+     *
+     * @param options configuration of request
+     * @param logger  slf4j logger
+     * @return the chosen one Doris BE node
+     * @throws IllegalArgumentException BE nodes is illegal
+     */
+    @VisibleForTesting
+    static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
+        String feNodes = options.getFenodes();
+        String feNode = randomEndpoint(feNodes, logger);
+        String beUrl = "http://" + feNode + BACKENDS_V2;
+        HttpGet httpGet = new HttpGet(beUrl);
+        String response = send(options, readOptions, httpGet, logger);
+        logger.info("Backend Info:{}", response);
+        List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
+        return backends;
+    }
+
+    static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException, IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        BackendV2 backend;
+        try {
+            backend = mapper.readValue(response, BackendV2.class);
+        } catch (JsonParseException e) {
+            String errMsg = "Doris BE's response is not a json. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (JsonMappingException e) {
+            String errMsg = "Doris BE's response cannot map to schema. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris BE's response to json failed. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        }
+
+        if (backend == null) {
+            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+            throw new ShouldNeverHappenException();
+        }
+        List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+        logger.debug("Parsing schema result is '{}'.", backendRows);
+        return backendRows;
+    }
+
+    /**
      * get a valid URI to connect Doris FE.
      *
      * @param options configuration of request
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
index d74e46f..d91614f 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
@@ -25,6 +25,7 @@ import java.util.List;
 /**
  * Be response model
  **/
+@Deprecated
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Backend {
 
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
index 5b7df99..3dd0471 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.rest.models;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+@Deprecated
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class BackendRow {
 
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
similarity index 52%
copy from extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
copy to extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
index d74e46f..5efb85e 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
@@ -26,16 +26,49 @@ import java.util.List;
  * Be response model
  **/
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class Backend {
+public class BackendV2 {
 
-    @JsonProperty(value = "rows")
-    private List<BackendRow> rows;
+    @JsonProperty(value = "backends")
+    private List<BackendRowV2> backends;
 
-    public List<BackendRow> getRows() {
-        return rows;
+    public List<BackendRowV2> getBackends() {
+        return backends;
     }
 
-    public void setRows(List<BackendRow> rows) {
-        this.rows = rows;
+    public void setBackends(List<BackendRowV2> backends) {
+        this.backends = backends;
+    }
+
+    public static class BackendRowV2 {
+        @JsonProperty("ip")
+        public String ip;
+        @JsonProperty("http_port")
+        public int httpPort;
+        @JsonProperty("is_alive")
+        public boolean isAlive;
+
+        public String getIp() {
+            return ip;
+        }
+
+        public void setIp(String ip) {
+            this.ip = ip;
+        }
+
+        public int getHttpPort() {
+            return httpPort;
+        }
+
+        public void setHttpPort(int httpPort) {
+            this.httpPort = httpPort;
+        }
+
+        public boolean isAlive() {
+            return isAlive;
+        }
+
+        public void setAlive(boolean alive) {
+            isAlive = alive;
+        }
     }
 }
diff --git a/extension/spark-doris-connector/build.sh b/extension/spark-doris-connector/build.sh
index b4ea042..d6ba435 100755
--- a/extension/spark-doris-connector/build.sh
+++ b/extension/spark-doris-connector/build.sh
@@ -39,7 +39,6 @@ fi
 # check maven
 MVN_CMD=mvn
 
-
 if [[ ! -z ${CUSTOM_MVN} ]]; then
     MVN_CMD=${CUSTOM_MVN}
 fi
@@ -48,11 +47,26 @@ if ! ${MVN_CMD} --version; then
     exit 1
 fi
 export MVN_CMD
-if [ $1 == 3 ]
+
+usage() {
+  echo "
+  Eg.
+    $0 2            build with spark 2.x
+    $0 3            build with spark 3.x
+  "
+  exit 1
+}
+
+if [ $# == 0 ]; then
+    usage
+fi
+
+
+if [ "$1"x == "3x" ]
 then
    ${MVN_CMD} clean package -f pom_3.0.xml
 fi
-if [ $1 == 2 ]
+if [ "$1"x == "2x" ]
 then
    ${MVN_CMD} clean package
 fi
diff --git a/extension/spark-doris-connector/pom_3.0.xml b/extension/spark-doris-connector/pom_3.0.xml
index 6c8eee5..d208ad0 100644
--- a/extension/spark-doris-connector/pom_3.0.xml
+++ b/extension/spark-doris-connector/pom_3.0.xml
@@ -161,6 +161,12 @@
             <version>4.1.27.Final</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 117825c..4411fbc 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -75,7 +75,7 @@ public class DorisStreamLoad implements Serializable{
     }
 
     public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
-        String hostPort = RestService.randomBackend(settings, LOG);
+        String hostPort = RestService.randomBackendV2(settings, LOG);
         this.hostPort = hostPort;
         String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
         this.db = dbTable[0];
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index bb91538..e8a2956 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -62,6 +62,7 @@ import org.apache.doris.spark.exception.IllegalArgumentException;
 import org.apache.doris.spark.exception.ShouldNeverHappenException;
 import org.apache.doris.spark.rest.models.Backend;
 import org.apache.doris.spark.rest.models.BackendRow;
+import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
 import org.apache.doris.spark.rest.models.Tablet;
@@ -86,8 +87,9 @@ public class RestService implements Serializable {
     private static final String API_PREFIX = "/api";
     private static final String SCHEMA = "_schema";
     private static final String QUERY_PLAN = "_query_plan";
+    @Deprecated
     private static final String BACKENDS = "/rest/v1/system?path=//backends";
-
+    private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
 
     /**
      * send request to Doris FE and get response json string.
@@ -478,14 +480,17 @@ public class RestService implements Serializable {
      * @param logger slf4j logger
      * @return the chosen one Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
+     * Deprecated, use randomBackendV2 instead
      */
+    @Deprecated
+    @VisibleForTesting
     public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
         String feNodes = sparkSettings.getProperty(DORIS_FENODES);
         String feNode = randomEndpoint(feNodes, logger);
-        String beUrl =   String.format("http://%s" + BACKENDS,feNode);
+        String beUrl =   String.format("http://%s" + BACKENDS, feNode);
         HttpGet httpGet = new HttpGet(beUrl);
-        String response = send(sparkSettings,httpGet, logger);
-        logger.info("Backend Info:{}",response);
+        String response = send(sparkSettings, httpGet, logger);
+        logger.info("Backend Info:{}", response);
         List<BackendRow> backends = parseBackend(response, logger);
         logger.trace("Parse beNodes '{}'.", backends);
         if (backends == null || backends.isEmpty()) {
@@ -497,7 +502,6 @@ public class RestService implements Serializable {
         return backend.getIP() + ":" + backend.getHttpPort();
     }
 
-
     /**
      * translate Doris FE response to inner {@link BackendRow} struct.
      * @param response Doris FE response
@@ -505,6 +509,7 @@ public class RestService implements Serializable {
      * @return inner {@link List<BackendRow>} struct
      * @throws DorisException,IOException throw when translate failed
      * */
+    @Deprecated
     @VisibleForTesting
     static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
         com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
@@ -535,6 +540,59 @@ public class RestService implements Serializable {
     }
 
     /**
+     * choice a Doris BE node to request.
+     * @param logger slf4j logger
+     * @return the chosen one Doris BE node
+     * @throws IllegalArgumentException BE nodes is illegal
+     */
+    @VisibleForTesting
+    public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
+        String feNodes = sparkSettings.getProperty(DORIS_FENODES);
+        String feNode = randomEndpoint(feNodes, logger);
+        String beUrl =   String.format("http://%s" + BACKENDS_V2, feNode);
+        HttpGet httpGet = new HttpGet(beUrl);
+        String response = send(sparkSettings, httpGet, logger);
+        logger.info("Backend Info:{}", response);
+        List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
+        logger.trace("Parse beNodes '{}'.", backends);
+        if (backends == null || backends.isEmpty()) {
+            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
+            throw new IllegalArgumentException("beNodes", String.valueOf(backends));
+        }
+        Collections.shuffle(backends);
+        BackendV2.BackendRowV2 backend = backends.get(0);
+        return backend.getIp() + ":" + backend.getHttpPort();
+    }
+
+    static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException {
+        com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
+        BackendV2 backend;
+        try {
+            backend = mapper.readValue(response, BackendV2.class);
+        } catch (com.fasterxml.jackson.core.JsonParseException e) {
+            String errMsg = "Doris BE's response is not a json. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (com.fasterxml.jackson.databind.JsonMappingException e) {
+            String errMsg = "Doris BE's response cannot map to schema. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris BE's response to json failed. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        }
+
+        if (backend == null) {
+            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+            throw new ShouldNeverHappenException();
+        }
+        List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+        logger.debug("Parsing schema result is '{}'.", backendRows);
+        return backendRows;
+    }
+
+    /**
      * translate BE tablets map to Doris RDD partition.
      * @param cfg configuration of request
      * @param be2Tablets BE to tablets {@link Map}
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
index 122e71c..322202d 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
@@ -23,6 +23,7 @@ import java.util.List;
 /**
  * Be response model
  **/
+@Deprecated
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Backend {
 
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java
index 0e2b385..a84ad2c 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 package org.apache.doris.spark.rest.models;
+
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
+@Deprecated
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class BackendRow {
 
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java
similarity index 52%
copy from extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
copy to extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java
index 122e71c..75a2514 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java
@@ -24,16 +24,49 @@ import java.util.List;
  * Be response model
  **/
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class Backend {
+public class BackendV2 {
 
-    @JsonProperty(value = "rows")
-    private List<BackendRow> rows;
+    @JsonProperty(value = "backends")
+    private List<BackendRowV2> backends;
 
-    public List<BackendRow> getRows() {
-        return rows;
+    public List<BackendRowV2> getBackends() {
+        return backends;
     }
 
-    public void setRows(List<BackendRow> rows) {
-        this.rows = rows;
+    public void setRows(List<BackendRowV2> rows) {
+        this.backends = rows;
+    }
+
+    public static class BackendRowV2 {
+        @JsonProperty("ip")
+        public String ip;
+        @JsonProperty("http_port")
+        public int httpPort;
+        @JsonProperty("is_alive")
+        public boolean isAlive;
+
+        public String getIp() {
+            return ip;
+        }
+
+        public void setIp(String ip) {
+            this.ip = ip;
+        }
+
+        public int getHttpPort() {
+            return httpPort;
+        }
+
+        public void setHttpPort(int httpPort) {
+            this.httpPort = httpPort;
+        }
+
+        public boolean isAlive() {
+            return isAlive;
+        }
+
+        public void setAlive(boolean alive) {
+            isAlive = alive;
+        }
     }
 }
diff --git a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
index 484be45..22d542a 100644
--- a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
+++ b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java
@@ -38,6 +38,7 @@ import org.apache.doris.spark.cfg.Settings;
 import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.IllegalArgumentException;
 import org.apache.doris.spark.rest.models.BackendRow;
+import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.Field;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
@@ -49,6 +50,8 @@ import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jdk.nashorn.internal.ir.annotations.Ignore;
+
 public class TestRestService {
     private static Logger logger = LoggerFactory.getLogger(TestRestService.class);
 
@@ -295,7 +298,8 @@ public class TestRestService {
         Assert.assertEquals(expected, actual);
     }
 
-    @Test
+    @Deprecated
+    @Ignore
     public void testParseBackend() throws Exception {
         String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," +
                 "\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," +
@@ -313,4 +317,11 @@ public class TestRestService {
         List<BackendRow> backendRows = RestService.parseBackend(response, logger);
         Assert.assertTrue(backendRows != null && !backendRows.isEmpty());
     }
+
+    @Test
+    public void testParseBackendV2() throws Exception {
+        String response = "{\"backends\":[{\"ip\":\"192.168.1.1\",\"http_port\":8042,\"is_alive\":true}, {\"ip\":\"192.168.1.2\",\"http_port\":8042,\"is_alive\":true}]}";
+        List<BackendV2.BackendRowV2> backendRows = RestService.parseBackendV2(response, logger);
+        Assert.assertEquals(2, backendRows.size());
+    }
 }
diff --git a/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
index e0d39af..be54aa9 100644
--- a/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
+++ b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -19,8 +19,12 @@ package org.apache.doris.spark.sql
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Ignore;
 import org.junit.Test
 
+// This test need real connect info to run.
+// Set the connect info before comment out this @Ignore
+@Ignore
 class TestSparkConnector {
   val dorisFeNodes = "your_fe_host:8030"
   val dorisUser = "root"
@@ -107,3 +111,4 @@ class TestSparkConnector {
       .start().awaitTermination()
   }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java
new file mode 100644
index 0000000..4d88c52
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.system.Backend;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * This class responsible for returning current backends info.
+ * Mainly used for flink/spark connector, which need backends info to execute stream load.
+ * It only requires password, no auth check.
+ * <p>
+ * Response:
+ * <p>
+ * {
+ * "msg": "success",
+ * "code": 0,
+ * "data": {
+ * "backends": [
+ * {
+ * "ip": "192.1.1.1",
+ * "http_port": 8040,
+ * "is_alive": true
+ * }
+ * ]
+ * },
+ * "count": 0
+ * }
+ */
+@RestController
+public class BackendsAction extends RestBaseController {
+    public static final Logger LOG = LogManager.getLogger(BackendsAction.class);
+
+    private static final String IS_ALIVE = "is_alive";
+
+    @RequestMapping(path = "/api/backends", method = {RequestMethod.GET})
+    public Object getBackends(HttpServletRequest request, HttpServletResponse response) {
+        executeCheckPassword(request, response);
+
+        boolean needAlive = false;
+        String isAlive = request.getParameter(IS_ALIVE);
+        if (!Strings.isNullOrEmpty(isAlive) && isAlive.equalsIgnoreCase("true")) {
+            needAlive = true;
+        }
+
+        BackendInfo backendInfo = new BackendInfo();
+        backendInfo.backends = Lists.newArrayList();
+        List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(needAlive);
+        for (Long beId : beIds) {
+            Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
+            if (be != null) {
+                BackendRow backendRow = new BackendRow();
+                backendRow.ip = be.getHost();
+                backendRow.httpPort = be.getHttpPort();
+                backendRow.isAlive = be.isAlive();
+                backendInfo.backends.add(backendRow);
+            }
+        }
+        return ResponseEntityBuilder.ok(backendInfo);
+    }
+
+    @Getter
+    @Setter
+    public static class BackendInfo {
+        @JsonProperty("backends")
+        public List<BackendRow> backends;
+    }
+
+    @Getter
+    @Setter
+    public static class BackendRow {
+        @JsonProperty("ip")
+        public String ip;
+        @JsonProperty("http_port")
+        public int httpPort;
+        @JsonProperty("is_alive")
+        public boolean isAlive;
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org