You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by wy...@apache.org on 2021/03/04 09:49:16 UTC

[incubator-doris] branch master updated: [Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when Spark Doris Connector in aync deserialize mode (#5336)

This is an automated email from the ASF dual-hosted git repository.

wyf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3913601  [Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when Spark Doris Connector in aync deserialize mode (#5336)
3913601 is described below

commit 39136011c2814c48d87388a8982dadbca7c67bcd
Author: 924060929 <92...@qq.com>
AuthorDate: Thu Mar 4 17:48:59 2021 +0800

    [Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when Spark Doris Connector in aync deserialize mode (#5336)
    
    Resolve deserialize exception when Spark Doris Connector in aync deserialize mode
    Co-authored-by: lanhuajian <la...@sankuai.com>
---
 .../apache/doris/spark/rdd/ScalaValueReader.scala  | 39 +++++++++++++++++++---
 1 file changed, 34 insertions(+), 5 deletions(-)

diff --git a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index 1d22c42..f3334b9 100644
--- a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -19,6 +19,7 @@ package org.apache.doris.spark.rdd
 
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent._
+import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
 
 import scala.collection.JavaConversions._
 import scala.util.Try
@@ -46,11 +47,14 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
   protected val logger = Logger.getLogger(classOf[ScalaValueReader])
 
   protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
+  protected val clientLock =
+    if (deserializeArrowToRowBatchAsync) new ReentrantLock()
+    else new NoOpLock
   protected var offset = 0
   protected var eos: AtomicBoolean = new AtomicBoolean(false)
   protected var rowBatch: RowBatch = _
   // flag indicate if support deserialize Arrow to RowBatch asynchronously
-  protected var deserializeArrowToRowBatchAsync: Boolean = Try {
+  protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
     settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
   } getOrElse {
     logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))
@@ -123,7 +127,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
     params
   }
 
-  protected val openResult: TScanOpenResult = client.openScanner(openParams)
+  protected val openResult: TScanOpenResult = lockClient(_.openScanner(openParams))
   protected val contextId: String = openResult.getContext_id
   protected val schema: Schema =
     SchemaUtils.convertToSchema(openResult.getSelected_columns)
@@ -134,7 +138,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
       nextBatchParams.setContext_id(contextId)
       while (!eos.get) {
         nextBatchParams.setOffset(offset)
-        val nextResult = client.getNext(nextBatchParams)
+        val nextResult = lockClient(_.getNext(nextBatchParams))
         eos.set(nextResult.isEos)
         if (!eos.get) {
           val rowBatch = new RowBatch(nextResult, schema)
@@ -192,7 +196,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
         val nextBatchParams = new TScanNextBatchParams
         nextBatchParams.setContext_id(contextId)
         nextBatchParams.setOffset(offset)
-        val nextResult = client.getNext(nextBatchParams)
+        val nextResult = lockClient(_.getNext(nextBatchParams))
         eos.set(nextResult.isEos)
         if (!eos.get) {
           rowBatch = new RowBatch(nextResult, schema)
@@ -218,6 +222,31 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
   def close(): Unit = {
     val closeParams = new TScanCloseParams
     closeParams.context_id = contextId
-    client.closeScanner(closeParams)
+    lockClient(_.closeScanner(closeParams))
+  }
+
+  private def lockClient[T](action: BackendClient => T): T = {
+    clientLock.lock()
+    try {
+      action(client)
+    } finally {
+      clientLock.unlock()
+    }
+  }
+
+  private class NoOpLock extends Lock {
+    override def lock(): Unit = {}
+
+    override def lockInterruptibly(): Unit = {}
+
+    override def tryLock(): Boolean = true
+
+    override def tryLock(time: Long, unit: TimeUnit): Boolean = true
+
+    override def unlock(): Unit = {}
+
+    override def newCondition(): Condition = {
+      throw new UnsupportedOperationException("NoOpLock can't provide a condition")
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org