You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2024/03/26 04:52:41 UTC

(kyuubi) branch branch-1.8 updated: [KYUUBI #6207] Support to retrieve Spark UserDefinedType result

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 89246c5d9 [KYUUBI #6207] Support to retrieve Spark UserDefinedType result
89246c5d9 is described below

commit 89246c5d910ac884755ab1e5199426013ab60342
Author: Wang, Fei <fw...@ebay.com>
AuthorDate: Mon Mar 25 21:47:54 2024 -0700

    [KYUUBI #6207] Support to retrieve Spark UserDefinedType result
    
    This pull request fixes #
    
    To fix below issue:
    ```
    24/03/25 00:47:10 ERROR SparkTBinaryFrontendService: Error getting result set metadata:
    java.lang.IllegalArgumentException: Unrecognized type name: struct<type:tinyint,size:int,indices:array<int>,values:array<double>>
    ```
    <img width="1567" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/9067d2d2-06d9-4937-b328-71434def34fd">
    
    Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing functionality to change)
    
    After this pr:
    <img width="1728" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/2d9f4f0b-9ac4-48e9-9e6a-4c0f1616edf9">
    
    ---
    
    - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6207 from turboFei/udt.
    
    Closes #6207
    
    b492568e3 [Wang, Fei] Revert "miss union type"
    39ac1c42f [Wang, Fei] miss union type
    8c32f54af [Wang, Fei] comment
    00a469855 [Wang, Fei] getColumnTypeName
    d7d291652 [Wang, Fei] ut
    edce5cf1f [Wang, Fei] support udt
    
    Authored-by: Wang, Fei <fw...@ebay.com>
    Signed-off-by: Wang, Fei <fw...@ebay.com>
    (cherry picked from commit a1b95410544fe2c2f0e9691e1999a522564403df)
    Signed-off-by: Wang, Fei <fw...@ebay.com>
---
 .../kyuubi/engine/spark/schema/SchemaHelper.scala  |  4 +-
 .../spark/sql/kyuubi/SparkDataTypeHelper.scala     | 29 +++++++++++++
 .../org/apache/spark/kyuubi/ExampleValueUDT.scala  | 44 ++++++++++++++++++++
 .../spark/kyuubi/SparkUDTOperationSuite.scala      | 47 ++++++++++++++++++++++
 .../org/apache/kyuubi/jdbc/hive/JdbcColumn.java    |  3 ++
 5 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
index 3beab47a5..bb7284d6e 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
@@ -22,6 +22,7 @@ import java.util.Collections
 import scala.collection.JavaConverters._
 
 import org.apache.hive.service.rpc.thrift._
+import org.apache.spark.sql.kyuubi.SparkDataTypeHelper
 import org.apache.spark.sql.types._
 
 object SchemaHelper {
@@ -64,7 +65,8 @@ object SchemaHelper {
     case _: ArrayType => TTypeId.ARRAY_TYPE
     case _: MapType => TTypeId.MAP_TYPE
     case _: StructType => TTypeId.STRUCT_TYPE
-    // TODO: it is private now, case udt: UserDefinedType => TTypeId.USER_DEFINED_TYPE
+    // SPARK-7768(fixed in 3.2.0) promoted UserDefinedType to DeveloperApi
+    case _ if SparkDataTypeHelper.isUserDefinedType(typ) => TTypeId.USER_DEFINED_TYPE
     case other =>
       throw new IllegalArgumentException(s"Unrecognized type name: ${other.catalogString}")
   }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDataTypeHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDataTypeHelper.scala
new file mode 100644
index 000000000..11f8be076
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDataTypeHelper.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kyuubi
+
+import org.apache.spark.sql.types.{DataType, UserDefinedType}
+
+object SparkDataTypeHelper {
+  def isUserDefinedType(typ: DataType): Boolean = {
+    typ match {
+      case _: UserDefinedType[_] => true
+      case _ => false
+    }
+  }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ExampleValueUDT.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ExampleValueUDT.scala
new file mode 100644
index 000000000..bfac1a153
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ExampleValueUDT.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, UserDefinedType}
+
+case class ExampleValue(v: Double)
+
+class ExampleValueUDT extends UserDefinedType[ExampleValue] {
+
+  override def sqlType: DataType = ArrayType(DoubleType, false)
+
+  override def pyUDT: String = "pyspark.testing.ExampleValueUDT"
+
+  override def serialize(obj: ExampleValue): GenericArrayData = {
+    new GenericArrayData(Array[Any](obj.v))
+  }
+
+  override def deserialize(datum: Any): ExampleValue = {
+    datum match {
+      case values: ArrayData => new ExampleValue(values.getDouble(0))
+    }
+  }
+
+  override def userClass: Class[ExampleValue] = classOf[ExampleValue]
+
+  override private[spark] def asNullable: ExampleValueUDT = this
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkUDTOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkUDTOperationSuite.scala
new file mode 100644
index 000000000..68da0b5b1
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkUDTOperationSuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import org.apache.spark.sql.types.UDTRegistration
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class SparkUDTOperationSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
+  override def withKyuubiConf: Map[String, String] = Map(
+    KyuubiConf.ENGINE_SINGLE_SPARK_SESSION.key -> "true")
+
+  override protected def jdbcUrl: String = getJdbcUrl
+
+  test("retrieve UserDefinedType result") {
+    UDTRegistration.register(classOf[ExampleValue].getName, classOf[ExampleValueUDT].getName)
+    spark.udf.register(
+      "exampleValueUdf",
+      (param: Double) =>
+        {
+          ExampleValue(param)
+        }: ExampleValue)
+
+    withJdbcStatement() { stmt =>
+      val result = stmt.executeQuery("SELECT exampleValueUdf(1.0)")
+      assert(result.next())
+      assert(result.getString(1) == ExampleValue(1.0).toString)
+    }
+  }
+}
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
index 365fc1d3e..858f330a6 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
@@ -178,6 +178,7 @@ public class JdbcColumn {
       case INTERVAL_YEAR_MONTH_TYPE:
       case INTERVAL_DAY_TIME_TYPE:
       case UNION_TYPE:
+      case USER_DEFINED_TYPE:
         return OTHER;
       case DECIMAL_TYPE:
         return DECIMAL;
@@ -240,6 +241,8 @@ public class JdbcColumn {
         return "struct";
       case NULL_TYPE:
         return "void";
+      case USER_DEFINED_TYPE:
+        return "user_defined";
       default:
         throw new KyuubiSQLException("Invalid column type: " + type);
     }