You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2022/07/20 11:26:37 UTC

[doris-spark-connector] branch master updated: [Feature]export doris write error to spark master, no need to check executor log (#44)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b97d60  [Feature]export doris write error to spark master, no need to check executor log (#44)
7b97d60 is described below

commit 7b97d60dac44192ae8ff2840ec5fcfc9cfb27c21
Author: Nick Young <my...@gmail.com>
AuthorDate: Wed Jul 20 19:26:33 2022 +0800

    [Feature]export doris write error to spark master, no need to check executor log (#44)
    
    * feat: export doris write error to spark master
    * feat: remove listToString log
---
 .../src/main/java/org/apache/doris/spark/DorisStreamLoad.java    | 2 ++
 .../scala/org/apache/doris/spark/sql/DorisSourceProvider.scala   | 9 ++++++---
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index be3c938..68e513c 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -211,6 +211,7 @@ public class DorisStreamLoad implements Serializable{
         LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
         LoadResponse loadResponse = loadBatch(value);
         if(loadResponse.status != 200){
+            LOG.info("Streamload Response HTTP Status Error:{}",loadResponse);
             throw new StreamLoadException("stream load error: " + loadResponse.respContent);
         }else{
             LOG.info("Streamload Response:{}",loadResponse);
@@ -218,6 +219,7 @@ public class DorisStreamLoad implements Serializable{
             try {
                 RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
                 if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){
+                    LOG.info("Streamload Response RES STATUS Error:{}", loadResponse);
                     throw new StreamLoadException("stream load error: " + respContent.getMessage());
                 }
             } catch (IOException e) {
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 41e82f8..3eaf70a 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -65,6 +65,9 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
     val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
     val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
 
+    logger.info(s"maxRowCount ${maxRowCount}")
+    logger.info(s"maxRetryTimes ${maxRetryTimes}")
+
     data.rdd.foreachPartition(partition => {
       val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
       partition.foreach(row => {
@@ -89,6 +92,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
        */
       def flush = {
         val loop = new Breaks
+        var err: Exception = null
         loop.breakable {
 
           for (i <- 1 to maxRetryTimes) {
@@ -103,10 +107,10 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
                   logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
                   //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
                   dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings, logger))
+                  if (err == null) err = e
                   Thread.sleep(1000 * i)
                 } catch {
                   case ex: InterruptedException =>
-                    logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
                     Thread.currentThread.interrupt()
                     throw new IOException("unable to flush; interrupted while doing another attempt", e)
                 }
@@ -114,8 +118,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
           }
 
           if (!rowsBuffer.isEmpty) {
-            logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
-            throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
+            throw new IOException(s"Failed to load ${maxRowCount} batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err)
           }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org