You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/04/10 10:55:54 UTC

[doris-spark-connector] branch master updated: [fix] add load success flag and check after retry loop (#94)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 616664f  [fix] add load success flag and check after retry loop (#94)
616664f is described below

commit 616664f5660f4208b5fa15bafa8f9824fcbfbd7a
Author: gnehil <ad...@gmail.com>
AuthorDate: Mon Apr 10 18:55:48 2023 +0800

    [fix] add load success flag and check after retry loop (#94)
---
 .../main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala  | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index e6c9960..e91e8fa 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -71,10 +71,12 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
     def flush(batch: Iterable[util.List[Object]]): Unit = {
       val loop = new Breaks
       var err: Exception = null
+      var loadSuccess: Boolean = false;
       loop.breakable {
         (1 to maxRetryTimes).foreach { i =>
           try {
             dorisStreamLoader.loadV2(batch.toList.asJava)
+            loadSuccess = true
             Thread.sleep(batchInterValMs.longValue())
             loop.break()
           } catch {
@@ -89,6 +91,9 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
                   throw new IOException("unable to flush; interrupted while doing another attempt", ex)
               }
           }
+        }
+        // check load success, if not throw exception
+        if (!loadSuccess) {
           throw new IOException(s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err)
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org