You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by wy...@apache.org on 2021/03/04 09:49:16 UTC
[incubator-doris] branch master updated:
[Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when Spark
Doris Connector in aync deserialize mode (#5336)
This is an automated email from the ASF dual-hosted git repository.
wyf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3913601 [Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when Spark Doris Connector in aync deserialize mode (#5336)
3913601 is described below
commit 39136011c2814c48d87388a8982dadbca7c67bcd
Author: 924060929 <92...@qq.com>
AuthorDate: Thu Mar 4 17:48:59 2021 +0800
[Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when Spark Doris Connector in aync deserialize mode (#5336)
Resolve deserialize exception when Spark Doris Connector in aync deserialize mode
Co-authored-by: lanhuajian <la...@sankuai.com>
---
.../apache/doris/spark/rdd/ScalaValueReader.scala | 39 +++++++++++++++++++---
1 file changed, 34 insertions(+), 5 deletions(-)
diff --git a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index 1d22c42..f3334b9 100644
--- a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -19,6 +19,7 @@ package org.apache.doris.spark.rdd
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent._
+import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
import scala.collection.JavaConversions._
import scala.util.Try
@@ -46,11 +47,14 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
protected val logger = Logger.getLogger(classOf[ScalaValueReader])
protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
+ protected val clientLock =
+ if (deserializeArrowToRowBatchAsync) new ReentrantLock()
+ else new NoOpLock
protected var offset = 0
protected var eos: AtomicBoolean = new AtomicBoolean(false)
protected var rowBatch: RowBatch = _
// flag indicate if support deserialize Arrow to RowBatch asynchronously
- protected var deserializeArrowToRowBatchAsync: Boolean = Try {
+ protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
} getOrElse {
logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))
@@ -123,7 +127,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
params
}
- protected val openResult: TScanOpenResult = client.openScanner(openParams)
+ protected val openResult: TScanOpenResult = lockClient(_.openScanner(openParams))
protected val contextId: String = openResult.getContext_id
protected val schema: Schema =
SchemaUtils.convertToSchema(openResult.getSelected_columns)
@@ -134,7 +138,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
nextBatchParams.setContext_id(contextId)
while (!eos.get) {
nextBatchParams.setOffset(offset)
- val nextResult = client.getNext(nextBatchParams)
+ val nextResult = lockClient(_.getNext(nextBatchParams))
eos.set(nextResult.isEos)
if (!eos.get) {
val rowBatch = new RowBatch(nextResult, schema)
@@ -192,7 +196,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
val nextBatchParams = new TScanNextBatchParams
nextBatchParams.setContext_id(contextId)
nextBatchParams.setOffset(offset)
- val nextResult = client.getNext(nextBatchParams)
+ val nextResult = lockClient(_.getNext(nextBatchParams))
eos.set(nextResult.isEos)
if (!eos.get) {
rowBatch = new RowBatch(nextResult, schema)
@@ -218,6 +222,31 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
def close(): Unit = {
val closeParams = new TScanCloseParams
closeParams.context_id = contextId
- client.closeScanner(closeParams)
+ lockClient(_.closeScanner(closeParams))
+ }
+
+ private def lockClient[T](action: BackendClient => T): T = {
+ clientLock.lock()
+ try {
+ action(client)
+ } finally {
+ clientLock.unlock()
+ }
+ }
+
+ private class NoOpLock extends Lock {
+ override def lock(): Unit = {}
+
+ override def lockInterruptibly(): Unit = {}
+
+ override def tryLock(): Boolean = true
+
+ override def tryLock(time: Long, unit: TimeUnit): Boolean = true
+
+ override def unlock(): Unit = {}
+
+ override def newCondition(): Condition = {
+ throw new UnsupportedOperationException("NoOpLock can't provide a condition")
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org