You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/23 09:40:09 UTC

[kyuubi] branch master updated: [KYUUBI #4754] [ARROW] Use `KyuubiArrowConveters#toBatchIterator` instead of `ArrowConveters#toBatchIterator`

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d0a7ca4ba [KYUUBI #4754] [ARROW] Use `KyuubiArrowConveters#toBatchIterator` instead of `ArrowConveters#toBatchIterator`
d0a7ca4ba is described below

commit d0a7ca4ba80c2995aac2143a940ece14be12527f
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Sun Apr 23 17:39:59 2023 +0800

    [KYUUBI #4754] [ARROW] Use `KyuubiArrowConveters#toBatchIterator` instead of `ArrowConveters#toBatchIterator`
    
    ### _Why are the changes needed?_
    
    to adapt Spark 3.4
    
    the signature of function `ArrowConveters#toBatchIterator` is changed in https://github.com/apache/spark/pull/38618 (since Spark 3.4)
    
    Before Spark 3.4:
    
    ```
    private[sql] def toBatchIterator(
        rowIter: Iterator[InternalRow],
        schema: StructType,
        maxRecordsPerBatch: Int,
        timeZoneId: String,
        context: TaskContext): Iterator[Array[Byte]]
    ```
    
    Spark 3.4
    
    ```
    private[sql] def toBatchIterator(
        rowIter: Iterator[InternalRow],
        schema: StructType,
        maxRecordsPerBatch: Long,
        timeZoneId: String,
        context: TaskContext): ArrowBatchIterator
    ```
    
    the return type is changed from `Iterator[Array[Byte]]` to `ArrowBatchIterator`
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4754 from cfmcgrady/arrow-spark34.
    
    Closes #4754
    
    a3c58d0ad [Fu Chen] fix ci
    32704c577 [Fu Chen] Revert "fix ci"
    e32311a03 [Fu Chen] fix ci
    a76af6209 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
    453b6a6b8 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
    74a9f7a9d [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
    4ce5844af [Fu Chen] adapt Spark 3.4
    
    Lead-authored-by: Fu Chen <cf...@gmail.com>
    Co-authored-by: Cheng Pan <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 6bd96676f..285a28a60 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.kyuubi
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.{ByteUnit, JavaUtils}
 import org.apache.spark.rdd.RDD
@@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.{CollectLimitExec, SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.arrow.{ArrowConverters, KyuubiArrowConverters}
+import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
@@ -73,17 +72,20 @@ object SparkDatasetHelper extends Logging {
   def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
     val schemaCaptured = plan.schema
     // TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
-    // drop Spark-3.1.x support.
+    // drop Spark 3.1 support.
     val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
     val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
+    // note that, we can't pass the lazy variable `maxBatchSize` directly, this is because input
+    // arguments are serialized and sent to the executor side for execution.
+    val maxBatchSizePerBatch = maxBatchSize
     plan.execute().mapPartitionsInternal { iter =>
-      val context = TaskContext.get()
-      ArrowConverters.toBatchIterator(
+      KyuubiArrowConverters.toBatchIterator(
         iter,
         schemaCaptured,
         maxRecordsPerBatch,
-        timeZoneId,
-        context)
+        maxBatchSizePerBatch,
+        -1,
+        timeZoneId)
     }
   }