You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/05/10 12:17:16 UTC

[kyuubi] branch master updated: [KYUUBI #4797] [ARROW] Reflective calls to the function `ArrowUtils#toArrowSchema`

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47353911d [KYUUBI #4797] [ARROW] Reflective calls to the function `ArrowUtils#toArrowSchema`
47353911d is described below

commit 47353911d219aab3460ad4ced14fc2483579b7cd
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Wed May 10 20:17:04 2023 +0800

    [KYUUBI #4797] [ARROW] Reflective calls to the function `ArrowUtils#toArrowSchema`
    
    ### _Why are the changes needed?_
    
    to adapt Spark 3.5
    
    the signature of function `ArrowUtils#toArrowSchema` is changed in https://github.com/apache/spark/pull/40988 (since Spark3.5)
    
    Spark 3.4 or previous
    
    ```scala
       def toArrowSchema(schema: StructType, timeZoneId: String): Schema
    ```
    
    Spark 3.5 or later
    ```scala
       def toArrowSchema(
          schema: StructType,
          timeZoneId: String,
          errorOnDuplicatedFieldNames: Boolean): Schema
    ```
    
    Kyuubi is not affected by the issue of duplicated nested field names, as it consistently converts struct types to string types in Arrow mode
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4797 from cfmcgrady/arrow-toArrowSchema.
    
    Closes #4797
    
    2eb881b87 [Fu Chen] auto box
    f69e0b395 [Fu Chen] asInstanceOf[Object] -> new JBoolean(errorOnDuplicatedFieldNames)
    84c0ed381 [Fu Chen] unnecessarily force conversions
    5ca65df8e [Fu Chen] Revert "new JBoolean"
    0f7a1b4bd [Fu Chen] new JBoolean
    044ba421c [Fu Chen] update comment
    989c3caf1 [Fu Chen] reflective call ArrowUtils.toArrowSchema
    
    Authored-by: Fu Chen <cf...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../execution/arrow/KyuubiArrowConverters.scala    | 36 ++++++++++++++++++++--
 1 file changed, 34 insertions(+), 2 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index 8a34943cc..5930dcdfc 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.arrow
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.lang.{Boolean => JBoolean}
 import java.nio.channels.Channels
 
 import scala.collection.JavaConverters._
@@ -26,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.arrow.vector._
 import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel}
 import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
+import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema}
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
@@ -36,6 +38,8 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.util.Utils
 
+import org.apache.kyuubi.reflection.DynMethods
+
 object KyuubiArrowConverters extends SQLConfHelper with Logging {
 
   type Batch = (Array[Byte], Long)
@@ -60,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
       "slice",
       0,
       Long.MaxValue)
-    val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
+    val arrowSchema = toArrowSchema(schema, timeZoneId, true)
     vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator)
     try {
       val recordBatch = MessageSerializer.deserializeRecordBatch(
@@ -238,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
       context: TaskContext)
     extends Iterator[Array[Byte]] {
 
-    protected val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
+    protected val arrowSchema = toArrowSchema(schema, timeZoneId, true)
     private val allocator =
       ArrowUtils.rootAllocator.newChildAllocator(
         s"to${this.getClass.getSimpleName}",
@@ -312,6 +316,34 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
     }
   }
 
+  // the signature of function [[ArrowUtils.toArrowSchema]] is changed in SPARK-41971 (since Spark
+  // 3.5)
+  private lazy val toArrowSchemaMethod = DynMethods.builder("toArrowSchema")
+    .impl( // for Spark 3.4 or previous
+      "org.apache.spark.sql.util.ArrowUtils",
+      classOf[StructType],
+      classOf[String])
+    .impl( // for Spark 3.5 or later
+      "org.apache.spark.sql.util.ArrowUtils",
+      classOf[StructType],
+      classOf[String],
+      classOf[Boolean])
+    .build()
+
+  /**
+   * this function uses reflective calls to the [[ArrowUtils.toArrowSchema]].
+   */
+  private def toArrowSchema(
+      schema: StructType,
+      timeZone: String,
+      errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = {
+    toArrowSchemaMethod.invoke[ArrowSchema](
+      ArrowUtils,
+      schema,
+      timeZone,
+      errorOnDuplicatedFieldNames)
+  }
+
   // for testing
   def fromBatchIterator(
       arrowBatchIter: Iterator[Array[Byte]],