You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/12/20 05:28:41 UTC

[doris-spark-connector] branch master updated: [improvement]Randomly get a new BE addresses when writing one batch at a time (#59)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b797bfc  [improvement]Randomly get a new BE addresses when writing one batch at a time (#59)
b797bfc is described below

commit b797bfc0605648d66cf2da61c6556805a032c179
Author: lexluo09 <39...@users.noreply.github.com>
AuthorDate: Tue Dec 20 13:28:23 2022 +0800

    [improvement]Randomly get a new BE addresses when writing one batch at a time (#59)
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 78 +++++++++++++---------
 .../org/apache/doris/spark/rest/RestService.java   | 37 +++++-----
 .../doris/spark/sql/DorisSourceProvider.scala      |  2 -
 .../doris/spark/sql/DorisStreamLoadSink.scala      |  3 -
 4 files changed, 65 insertions(+), 55 deletions(-)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index f375c76..a868eb9 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -18,11 +18,16 @@ package org.apache.doris.spark;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.SparkSettings;
 import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.StreamLoadException;
 import org.apache.doris.spark.rest.RestService;
+import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.RespContent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,16 +41,9 @@ import java.io.Serializable;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.List;
-import java.util.StringJoiner;
-import java.util.UUID;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Calendar;
-import java.util.Properties;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -63,7 +61,6 @@ public class DorisStreamLoad implements Serializable{
     private String user;
     private String passwd;
     private String loadUrlStr;
-    private String hostPort;
     private String db;
     private String tbl;
     private String authEncoding;
@@ -71,9 +68,10 @@ public class DorisStreamLoad implements Serializable{
     private String[] dfColumns;
     private String maxFilterRatio;
     private Map<String,String> streamLoadProp;
+    private static final long cacheExpireTimeout = 4 * 60;
+    private LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
 
     public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
-        this.hostPort = hostPort;
         this.db = db;
         this.tbl = tbl;
         this.user = user;
@@ -83,27 +81,27 @@ public class DorisStreamLoad implements Serializable{
     }
 
     public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
-        String hostPort = RestService.randomBackendV2(settings, LOG);
-        this.hostPort = hostPort;
         String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
         this.db = dbTable[0];
         this.tbl = dbTable[1];
         this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
         this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
-        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
         this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
 
         this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
         this.streamLoadProp=getStreamLoadProp(settings);
-
-
-
+        cache = CacheBuilder.newBuilder()
+                .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
+                .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() {
+                    @Override
+                    public List<BackendV2.BackendRowV2> load(String key) throws IOException, DorisException {
+                        return RestService.getBackendRows(settings, LOG);
+                    }
+                });
     }
 
     public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
-        String hostPort = RestService.randomBackendV2(settings, LOG);
-        this.hostPort = hostPort;
         String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
         this.db = dbTable[0];
         this.tbl = dbTable[1];
@@ -111,29 +109,29 @@ public class DorisStreamLoad implements Serializable{
         this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
 
 
-        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
         this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
         this.dfColumns = dfColumns;
 
         this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
         this.streamLoadProp=getStreamLoadProp(settings);
+        cache = CacheBuilder.newBuilder()
+                .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
+                .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() {
+                    @Override
+                    public List<BackendV2.BackendRowV2> load(String key) throws IOException, DorisException {
+                        return RestService.getBackendRows(settings, LOG);
+                    }
+                });
     }
 
     public String getLoadUrlStr() {
+        if (StringUtils.isEmpty(loadUrlStr)) {
+            return "";
+        }
         return loadUrlStr;
     }
 
-    public String getHostPort() {
-        return hostPort;
-    }
-
-    public void setHostPort(String hostPort) {
-        this.hostPort = hostPort;
-        this.loadUrlStr = String.format(loadUrlPattern, hostPort, this.db, this.tbl);
-    }
-
-
     private HttpURLConnection getConnection(String urlStr, String label) throws IOException {
         URL url = new URL(urlStr);
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@@ -228,7 +226,6 @@ public class DorisStreamLoad implements Serializable{
     }
 
     public void load(String value) throws StreamLoadException {
-        LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
         LoadResponse loadResponse = loadBatch(value);
         if(loadResponse.status != 200){
             LOG.info("Streamload Response HTTP Status Error:{}",loadResponse);
@@ -255,6 +252,11 @@ public class DorisStreamLoad implements Serializable{
                 calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                 UUID.randomUUID().toString().replaceAll("-", ""));
 
+        String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
+        LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
+        //only to record the BE node in case of an exception
+        this.loadUrlStr = loadUrlStr;
+
         HttpURLConnection feConn = null;
         HttpURLConnection beConn = null;
         int status = -1;
@@ -304,4 +306,16 @@ public class DorisStreamLoad implements Serializable{
         }
         return streamLoadPropMap;
     }
+
+    private String getBackend() {
+        try {
+            //get backends from cache
+            List<BackendV2.BackendRowV2> backends = cache.get("backends");
+            Collections.shuffle(backends);
+            BackendV2.BackendRowV2 backend = backends.get(0);
+            return backend.getIp() + ":" + backend.getHttpPort();
+        } catch (ExecutionException e) {
+            throw new RuntimeException("get backends info fail",e);
+        }
+    }
 }
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index 1410ce4..b614e1a 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -484,22 +484,11 @@ public class RestService implements Serializable {
      */
     @Deprecated
     @VisibleForTesting
-    public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
-        String feNodes = sparkSettings.getProperty(DORIS_FENODES);
-        String feNode = randomEndpoint(feNodes, logger);
-        String beUrl =   String.format("http://%s" + BACKENDS, feNode);
-        HttpGet httpGet = new HttpGet(beUrl);
-        String response = send(sparkSettings, httpGet, logger);
-        logger.info("Backend Info:{}", response);
-        List<BackendRow> backends = parseBackend(response, logger);
-        logger.trace("Parse beNodes '{}'.", backends);
-        if (backends == null || backends.isEmpty()) {
-            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
-            throw new IllegalArgumentException("beNodes", String.valueOf(backends));
-        }
+    public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException {
+        List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
         Collections.shuffle(backends);
-        BackendRow backend = backends.get(0);
-        return backend.getIP() + ":" + backend.getHttpPort();
+        BackendV2.BackendRowV2 backend = backends.get(0);
+        return backend.getIp()+ ":" + backend.getHttpPort();
     }
 
     /**
@@ -540,13 +529,13 @@ public class RestService implements Serializable {
     }
 
     /**
-     * choice a Doris BE node to request.
+     * get Doris BE node list.
      * @param logger slf4j logger
-     * @return the chosen one Doris BE node
+     * @return the Doris BE node list
      * @throws IllegalArgumentException BE nodes is illegal
      */
     @VisibleForTesting
-    public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
+    public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings sparkSettings,  Logger logger) throws DorisException {
         String feNodes = sparkSettings.getProperty(DORIS_FENODES);
         String feNode = randomEndpoint(feNodes, logger);
         String beUrl =   String.format("http://%s" + BACKENDS_V2, feNode);
@@ -559,6 +548,18 @@ public class RestService implements Serializable {
             logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
             throw new IllegalArgumentException("beNodes", String.valueOf(backends));
         }
+        return backends;
+    }
+
+    /**
+     * choice a Doris BE node to request.
+     * @param logger slf4j logger
+     * @return the chosen one Doris BE node
+     * @throws IllegalArgumentException BE nodes is illegal
+     */
+    @VisibleForTesting
+    public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
+        List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
         Collections.shuffle(backends);
         BackendV2.BackendRowV2 backend = backends.get(0);
         return backend.getIp() + ":" + backend.getHttpPort();
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 3eaf70a..31bd1aa 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -105,8 +105,6 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
               case e: Exception =>
                 try {
                   logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
-                  //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
-                  dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings, logger))
                   if (err == null) err = e
                   Thread.sleep(1000 * i)
                 } catch {
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 566eb3b..afc5f31 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.slf4j.{Logger, LoggerFactory}
 import java.io.IOException
-import org.apache.doris.spark.rest.RestService
 
 import scala.util.control.Breaks
 
@@ -91,8 +90,6 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
               case e: Exception =>
                 try {
                   logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
-                  //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
-                  dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings, logger))
                   Thread.sleep(1000 * i)
                 } catch {
                   case ex: InterruptedException =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org