You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/12/20 05:28:41 UTC
[doris-spark-connector] branch master updated: [improvement]Randomly get a new BE addresses when writing one batch at a time (#59)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b797bfc [improvement]Randomly get a new BE addresses when writing one batch at a time (#59)
b797bfc is described below
commit b797bfc0605648d66cf2da61c6556805a032c179
Author: lexluo09 <39...@users.noreply.github.com>
AuthorDate: Tue Dec 20 13:28:23 2022 +0800
[improvement]Randomly get a new BE addresses when writing one batch at a time (#59)
---
.../org/apache/doris/spark/DorisStreamLoad.java | 78 +++++++++++++---------
.../org/apache/doris/spark/rest/RestService.java | 37 +++++-----
.../doris/spark/sql/DorisSourceProvider.scala | 2 -
.../doris/spark/sql/DorisStreamLoadSink.scala | 3 -
4 files changed, 65 insertions(+), 55 deletions(-)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index f375c76..a868eb9 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -18,11 +18,16 @@ package org.apache.doris.spark;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.StreamLoadException;
import org.apache.doris.spark.rest.RestService;
+import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.RespContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,16 +41,9 @@ import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.List;
-import java.util.StringJoiner;
-import java.util.UUID;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Calendar;
-import java.util.Properties;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
@@ -63,7 +61,6 @@ public class DorisStreamLoad implements Serializable{
private String user;
private String passwd;
private String loadUrlStr;
- private String hostPort;
private String db;
private String tbl;
private String authEncoding;
@@ -71,9 +68,10 @@ public class DorisStreamLoad implements Serializable{
private String[] dfColumns;
private String maxFilterRatio;
private Map<String,String> streamLoadProp;
+ private static final long cacheExpireTimeout = 4 * 60;
+ private LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
- this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
this.user = user;
@@ -83,27 +81,27 @@ public class DorisStreamLoad implements Serializable{
}
public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
- String hostPort = RestService.randomBackendV2(settings, LOG);
- this.hostPort = hostPort;
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
this.tbl = dbTable[1];
this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
- this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
this.streamLoadProp=getStreamLoadProp(settings);
-
-
-
+ cache = CacheBuilder.newBuilder()
+ .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
+ .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() {
+ @Override
+ public List<BackendV2.BackendRowV2> load(String key) throws IOException, DorisException {
+ return RestService.getBackendRows(settings, LOG);
+ }
+ });
}
public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
- String hostPort = RestService.randomBackendV2(settings, LOG);
- this.hostPort = hostPort;
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
this.tbl = dbTable[1];
@@ -111,29 +109,29 @@ public class DorisStreamLoad implements Serializable{
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
- this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.dfColumns = dfColumns;
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
this.streamLoadProp=getStreamLoadProp(settings);
+ cache = CacheBuilder.newBuilder()
+ .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
+ .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() {
+ @Override
+ public List<BackendV2.BackendRowV2> load(String key) throws IOException, DorisException {
+ return RestService.getBackendRows(settings, LOG);
+ }
+ });
}
public String getLoadUrlStr() {
+ if (StringUtils.isEmpty(loadUrlStr)) {
+ return "";
+ }
return loadUrlStr;
}
- public String getHostPort() {
- return hostPort;
- }
-
- public void setHostPort(String hostPort) {
- this.hostPort = hostPort;
- this.loadUrlStr = String.format(loadUrlPattern, hostPort, this.db, this.tbl);
- }
-
-
private HttpURLConnection getConnection(String urlStr, String label) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@@ -228,7 +226,6 @@ public class DorisStreamLoad implements Serializable{
}
public void load(String value) throws StreamLoadException {
- LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
LoadResponse loadResponse = loadBatch(value);
if(loadResponse.status != 200){
LOG.info("Streamload Response HTTP Status Error:{}",loadResponse);
@@ -255,6 +252,11 @@ public class DorisStreamLoad implements Serializable{
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
+ String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
+ LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
+ //only to record the BE node in case of an exception
+ this.loadUrlStr = loadUrlStr;
+
HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
int status = -1;
@@ -304,4 +306,16 @@ public class DorisStreamLoad implements Serializable{
}
return streamLoadPropMap;
}
+
+ private String getBackend() {
+ try {
+ //get backends from cache
+ List<BackendV2.BackendRowV2> backends = cache.get("backends");
+ Collections.shuffle(backends);
+ BackendV2.BackendRowV2 backend = backends.get(0);
+ return backend.getIp() + ":" + backend.getHttpPort();
+ } catch (ExecutionException e) {
+ throw new RuntimeException("get backends info fail",e);
+ }
+ }
}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index 1410ce4..b614e1a 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -484,22 +484,11 @@ public class RestService implements Serializable {
*/
@Deprecated
@VisibleForTesting
- public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
- String feNodes = sparkSettings.getProperty(DORIS_FENODES);
- String feNode = randomEndpoint(feNodes, logger);
- String beUrl = String.format("http://%s" + BACKENDS, feNode);
- HttpGet httpGet = new HttpGet(beUrl);
- String response = send(sparkSettings, httpGet, logger);
- logger.info("Backend Info:{}", response);
- List<BackendRow> backends = parseBackend(response, logger);
- logger.trace("Parse beNodes '{}'.", backends);
- if (backends == null || backends.isEmpty()) {
- logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
- throw new IllegalArgumentException("beNodes", String.valueOf(backends));
- }
+ public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException {
+ List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
Collections.shuffle(backends);
- BackendRow backend = backends.get(0);
- return backend.getIP() + ":" + backend.getHttpPort();
+ BackendV2.BackendRowV2 backend = backends.get(0);
+ return backend.getIp()+ ":" + backend.getHttpPort();
}
/**
@@ -540,13 +529,13 @@ public class RestService implements Serializable {
}
/**
- * choice a Doris BE node to request.
+ * get Doris BE node list.
* @param logger slf4j logger
- * @return the chosen one Doris BE node
+ * @return the Doris BE node list
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
- public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
+ public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings sparkSettings, Logger logger) throws DorisException {
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
String feNode = randomEndpoint(feNodes, logger);
String beUrl = String.format("http://%s" + BACKENDS_V2, feNode);
@@ -559,6 +548,18 @@ public class RestService implements Serializable {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
}
+ return backends;
+ }
+
+ /**
+ * choice a Doris BE node to request.
+ * @param logger slf4j logger
+ * @return the chosen one Doris BE node
+ * @throws IllegalArgumentException BE nodes is illegal
+ */
+ @VisibleForTesting
+ public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
+ List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
Collections.shuffle(backends);
BackendV2.BackendRowV2 backend = backends.get(0);
return backend.getIp() + ":" + backend.getHttpPort();
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 3eaf70a..31bd1aa 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -105,8 +105,6 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
case e: Exception =>
try {
logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
- //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
- dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings, logger))
if (err == null) err = e
Thread.sleep(1000 * i)
} catch {
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 566eb3b..afc5f31 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
import java.io.IOException
-import org.apache.doris.spark.rest.RestService
import scala.util.control.Breaks
@@ -91,8 +90,6 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
case e: Exception =>
try {
logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
- //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
- dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings, logger))
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org