You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2022/07/20 11:26:37 UTC
[doris-spark-connector] branch master updated: [Feature]export doris write error to spark master, no need to check executor log (#44)
This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7b97d60 [Feature]export doris write error to spark master, no need to check executor log (#44)
7b97d60 is described below
commit 7b97d60dac44192ae8ff2840ec5fcfc9cfb27c21
Author: Nick Young <my...@gmail.com>
AuthorDate: Wed Jul 20 19:26:33 2022 +0800
[Feature]export doris write error to spark master, no need to check executor log (#44)
* feat: export doris write error to spark master
* feat: remove listToString log
---
.../src/main/java/org/apache/doris/spark/DorisStreamLoad.java | 2 ++
.../scala/org/apache/doris/spark/sql/DorisSourceProvider.scala | 9 ++++++---
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index be3c938..68e513c 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -211,6 +211,7 @@ public class DorisStreamLoad implements Serializable{
LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
LoadResponse loadResponse = loadBatch(value);
if(loadResponse.status != 200){
+ LOG.info("Streamload Response HTTP Status Error:{}",loadResponse);
throw new StreamLoadException("stream load error: " + loadResponse.respContent);
}else{
LOG.info("Streamload Response:{}",loadResponse);
@@ -218,6 +219,7 @@ public class DorisStreamLoad implements Serializable{
try {
RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){
+ LOG.info("Streamload Response RES STATUS Error:{}", loadResponse);
throw new StreamLoadException("stream load error: " + respContent.getMessage());
}
} catch (IOException e) {
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 41e82f8..3eaf70a 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -65,6 +65,9 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
+ logger.info(s"maxRowCount ${maxRowCount}")
+ logger.info(s"maxRetryTimes ${maxRetryTimes}")
+
data.rdd.foreachPartition(partition => {
val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
partition.foreach(row => {
@@ -89,6 +92,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
*/
def flush = {
val loop = new Breaks
+ var err: Exception = null
loop.breakable {
for (i <- 1 to maxRetryTimes) {
@@ -103,10 +107,10 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
//If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings, logger))
+ if (err == null) err = e
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
- logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
Thread.currentThread.interrupt()
throw new IOException("unable to flush; interrupted while doing another attempt", e)
}
@@ -114,8 +118,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
}
if (!rowsBuffer.isEmpty) {
- logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
- throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
+ throw new IOException(s"Failed to load ${maxRowCount} batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org