You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/05/10 12:17:16 UTC
[kyuubi] branch master updated: [KYUUBI #4797] [ARROW] Reflective calls to the function `ArrowUtils#toArrowSchema`
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 47353911d [KYUUBI #4797] [ARROW] Reflective calls to the function `ArrowUtils#toArrowSchema`
47353911d is described below
commit 47353911d219aab3460ad4ced14fc2483579b7cd
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Wed May 10 20:17:04 2023 +0800
[KYUUBI #4797] [ARROW] Reflective calls to the function `ArrowUtils#toArrowSchema`
### _Why are the changes needed?_
to adapt Spark 3.5
the signature of function `ArrowUtils#toArrowSchema` is changed in https://github.com/apache/spark/pull/40988 (since Spark3.5)
Spark 3.4 or previous
```scala
def toArrowSchema(schema: StructType, timeZoneId: String): Schema
```
Spark 3.5 or later
```scala
def toArrowSchema(
schema: StructType,
timeZoneId: String,
errorOnDuplicatedFieldNames: Boolean): Schema
```
Kyuubi is not affected by the issue of duplicated nested field names, as it consistently converts struct types to string types in Arrow mode
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4797 from cfmcgrady/arrow-toArrowSchema.
Closes #4797
2eb881b87 [Fu Chen] auto box
f69e0b395 [Fu Chen] asInstanceOf[Object] -> new JBoolean(errorOnDuplicatedFieldNames)
84c0ed381 [Fu Chen] unnecessarily force conversions
5ca65df8e [Fu Chen] Revert "new JBoolean"
0f7a1b4bd [Fu Chen] new JBoolean
044ba421c [Fu Chen] update comment
989c3caf1 [Fu Chen] reflective call ArrowUtils.toArrowSchema
Authored-by: Fu Chen <cf...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../execution/arrow/KyuubiArrowConverters.scala | 36 ++++++++++++++++++++--
1 file changed, 34 insertions(+), 2 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index 8a34943cc..5930dcdfc 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.arrow
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.lang.{Boolean => JBoolean}
import java.nio.channels.Channels
import scala.collection.JavaConverters._
@@ -26,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.arrow.vector._
import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel}
import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
+import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
@@ -36,6 +38,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.util.Utils
+import org.apache.kyuubi.reflection.DynMethods
+
object KyuubiArrowConverters extends SQLConfHelper with Logging {
type Batch = (Array[Byte], Long)
@@ -60,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
"slice",
0,
Long.MaxValue)
- val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
+ val arrowSchema = toArrowSchema(schema, timeZoneId, true)
vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator)
try {
val recordBatch = MessageSerializer.deserializeRecordBatch(
@@ -238,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
context: TaskContext)
extends Iterator[Array[Byte]] {
- protected val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
+ protected val arrowSchema = toArrowSchema(schema, timeZoneId, true)
private val allocator =
ArrowUtils.rootAllocator.newChildAllocator(
s"to${this.getClass.getSimpleName}",
@@ -312,6 +316,34 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
}
}
+ // the signature of function [[ArrowUtils.toArrowSchema]] is changed in SPARK-41971 (since Spark
+ // 3.5)
+ private lazy val toArrowSchemaMethod = DynMethods.builder("toArrowSchema")
+ .impl( // for Spark 3.4 or previous
+ "org.apache.spark.sql.util.ArrowUtils",
+ classOf[StructType],
+ classOf[String])
+ .impl( // for Spark 3.5 or later
+ "org.apache.spark.sql.util.ArrowUtils",
+ classOf[StructType],
+ classOf[String],
+ classOf[Boolean])
+ .build()
+
+ /**
+ * this function uses reflective calls to the [[ArrowUtils.toArrowSchema]].
+ */
+ private def toArrowSchema(
+ schema: StructType,
+ timeZone: String,
+ errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = {
+ toArrowSchemaMethod.invoke[ArrowSchema](
+ ArrowUtils,
+ schema,
+ timeZone,
+ errorOnDuplicatedFieldNames)
+ }
+
// for testing
def fromBatchIterator(
arrowBatchIter: Iterator[Array[Byte]],