You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/11/20 06:44:02 UTC

(doris-spark-connector) branch master updated: [Fix] fix retry read buffer problem (#160)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9db6115  [Fix] fix retry read buffer problem (#160)
9db6115 is described below

commit 9db61150189fef57d89e8ba9259f9ab3541bd643
Author: gnehil <ad...@gmail.com>
AuthorDate: Mon Nov 20 14:43:56 2023 +0800

    [Fix] fix retry read buffer problem (#160)
---
 .../apache/doris/spark/writer/DorisWriter.scala    | 28 ++++++++++++++++------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index 59092f6..0485671 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -155,12 +155,10 @@ class DorisWriter(settings: SparkSettings, preCommittedTxnAcc: CollectionAccumul
     override def next(): T = {
       recordCount += 1
       if (batchRetryEnable) {
-        if (isReset && buffer.nonEmpty) {
-          buffer(recordCount)
+        if (isReset) {
+          readBuffer()
         } else {
-          val elem = iterator.next
-          buffer += elem
-          elem
+          writeBufferAndReturn()
         }
       } else {
         iterator.next
@@ -172,8 +170,10 @@ class DorisWriter(settings: SparkSettings, preCommittedTxnAcc: CollectionAccumul
      */
     def reset(): Unit = {
       recordCount = 0
-      isReset = true
-      logger.info("batch iterator is reset")
+      isReset = buffer.nonEmpty
+      if (isReset) {
+        logger.info("buffer is not empty and batch iterator is reset")
+      }
     }
 
     /**
@@ -186,6 +186,20 @@ class DorisWriter(settings: SparkSettings, preCommittedTxnAcc: CollectionAccumul
       }
     }
 
+    private def readBuffer(): T = {
+      if (recordCount == buffer.size) {
+        logger.debug("read buffer end, recordCount:{}, bufferSize: {}", recordCount, buffer.size)
+        isReset = false
+      }
+      buffer(recordCount - 1)
+    }
+
+    private def writeBufferAndReturn(): T = {
+      val elem = iterator.next
+      buffer += elem
+      elem
+    }
+
   }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org