You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/04/10 10:55:54 UTC
[doris-spark-connector] branch master updated: [fix] add load success flag and check after retry loop (#94)
This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 616664f [fix] add load success flag and check after retry loop (#94)
616664f is described below
commit 616664f5660f4208b5fa15bafa8f9824fcbfbd7a
Author: gnehil <ad...@gmail.com>
AuthorDate: Mon Apr 10 18:55:48 2023 +0800
[fix] add load success flag and check after retry loop (#94)
---
.../main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index e6c9960..e91e8fa 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -71,10 +71,12 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
def flush(batch: Iterable[util.List[Object]]): Unit = {
val loop = new Breaks
var err: Exception = null
+ var loadSuccess: Boolean = false;
loop.breakable {
(1 to maxRetryTimes).foreach { i =>
try {
dorisStreamLoader.loadV2(batch.toList.asJava)
+ loadSuccess = true
Thread.sleep(batchInterValMs.longValue())
loop.break()
} catch {
@@ -89,6 +91,9 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
throw new IOException("unable to flush; interrupted while doing another attempt", ex)
}
}
+ }
+ // check load success, if not throw exception
+ if (!loadSuccess) {
throw new IOException(s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org