You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2024/03/26 04:52:41 UTC
(kyuubi) branch branch-1.8 updated: [KYUUBI #6207] Support to retrieve Spark UserDefinedType result
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 89246c5d9 [KYUUBI #6207] Support to retrieve Spark UserDefinedType result
89246c5d9 is described below
commit 89246c5d910ac884755ab1e5199426013ab60342
Author: Wang, Fei <fw...@ebay.com>
AuthorDate: Mon Mar 25 21:47:54 2024 -0700
[KYUUBI #6207] Support to retrieve Spark UserDefinedType result
This pull request fixes #
To fix below issue:
```
24/03/25 00:47:10 ERROR SparkTBinaryFrontendService: Error getting result set metadata:
java.lang.IllegalArgumentException: Unrecognized type name: struct<type:tinyint,size:int,indices:array<int>,values:array<double>>
```
<img width="1567" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/9067d2d2-06d9-4937-b328-71434def34fd">
Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
After this pr:
<img width="1728" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/2d9f4f0b-9ac4-48e9-9e6a-4c0f1616edf9">
---
- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6207 from turboFei/udt.
Closes #6207
b492568e3 [Wang, Fei] Revert "miss union type"
39ac1c42f [Wang, Fei] miss union type
8c32f54af [Wang, Fei] comment
00a469855 [Wang, Fei] getColumnTypeName
d7d291652 [Wang, Fei] ut
edce5cf1f [Wang, Fei] support udt
Authored-by: Wang, Fei <fw...@ebay.com>
Signed-off-by: Wang, Fei <fw...@ebay.com>
(cherry picked from commit a1b95410544fe2c2f0e9691e1999a522564403df)
Signed-off-by: Wang, Fei <fw...@ebay.com>
---
.../kyuubi/engine/spark/schema/SchemaHelper.scala | 4 +-
.../spark/sql/kyuubi/SparkDataTypeHelper.scala | 29 +++++++++++++
.../org/apache/spark/kyuubi/ExampleValueUDT.scala | 44 ++++++++++++++++++++
.../spark/kyuubi/SparkUDTOperationSuite.scala | 47 ++++++++++++++++++++++
.../org/apache/kyuubi/jdbc/hive/JdbcColumn.java | 3 ++
5 files changed, 126 insertions(+), 1 deletion(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
index 3beab47a5..bb7284d6e 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
@@ -22,6 +22,7 @@ import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift._
+import org.apache.spark.sql.kyuubi.SparkDataTypeHelper
import org.apache.spark.sql.types._
object SchemaHelper {
@@ -64,7 +65,8 @@ object SchemaHelper {
case _: ArrayType => TTypeId.ARRAY_TYPE
case _: MapType => TTypeId.MAP_TYPE
case _: StructType => TTypeId.STRUCT_TYPE
- // TODO: it is private now, case udt: UserDefinedType => TTypeId.USER_DEFINED_TYPE
+ // SPARK-7768(fixed in 3.2.0) promoted UserDefinedType to DeveloperApi
+ case _ if SparkDataTypeHelper.isUserDefinedType(typ) => TTypeId.USER_DEFINED_TYPE
case other =>
throw new IllegalArgumentException(s"Unrecognized type name: ${other.catalogString}")
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDataTypeHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDataTypeHelper.scala
new file mode 100644
index 000000000..11f8be076
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDataTypeHelper.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kyuubi
+
+import org.apache.spark.sql.types.{DataType, UserDefinedType}
+
+object SparkDataTypeHelper {
+ def isUserDefinedType(typ: DataType): Boolean = {
+ typ match {
+ case _: UserDefinedType[_] => true
+ case _ => false
+ }
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ExampleValueUDT.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ExampleValueUDT.scala
new file mode 100644
index 000000000..bfac1a153
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ExampleValueUDT.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, UserDefinedType}
+
+case class ExampleValue(v: Double)
+
+class ExampleValueUDT extends UserDefinedType[ExampleValue] {
+
+ override def sqlType: DataType = ArrayType(DoubleType, false)
+
+ override def pyUDT: String = "pyspark.testing.ExampleValueUDT"
+
+ override def serialize(obj: ExampleValue): GenericArrayData = {
+ new GenericArrayData(Array[Any](obj.v))
+ }
+
+ override def deserialize(datum: Any): ExampleValue = {
+ datum match {
+ case values: ArrayData => new ExampleValue(values.getDouble(0))
+ }
+ }
+
+ override def userClass: Class[ExampleValue] = classOf[ExampleValue]
+
+ override private[spark] def asNullable: ExampleValueUDT = this
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkUDTOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkUDTOperationSuite.scala
new file mode 100644
index 000000000..68da0b5b1
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkUDTOperationSuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import org.apache.spark.sql.types.UDTRegistration
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class SparkUDTOperationSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
+ override def withKyuubiConf: Map[String, String] = Map(
+ KyuubiConf.ENGINE_SINGLE_SPARK_SESSION.key -> "true")
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ test("retrieve UserDefinedType result") {
+ UDTRegistration.register(classOf[ExampleValue].getName, classOf[ExampleValueUDT].getName)
+ spark.udf.register(
+ "exampleValueUdf",
+ (param: Double) =>
+ {
+ ExampleValue(param)
+ }: ExampleValue)
+
+ withJdbcStatement() { stmt =>
+ val result = stmt.executeQuery("SELECT exampleValueUdf(1.0)")
+ assert(result.next())
+ assert(result.getString(1) == ExampleValue(1.0).toString)
+ }
+ }
+}
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
index 365fc1d3e..858f330a6 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
@@ -178,6 +178,7 @@ public class JdbcColumn {
case INTERVAL_YEAR_MONTH_TYPE:
case INTERVAL_DAY_TIME_TYPE:
case UNION_TYPE:
+ case USER_DEFINED_TYPE:
return OTHER;
case DECIMAL_TYPE:
return DECIMAL;
@@ -240,6 +241,8 @@ public class JdbcColumn {
return "struct";
case NULL_TYPE:
return "void";
+ case USER_DEFINED_TYPE:
+ return "user_defined";
default:
throw new KyuubiSQLException("Invalid column type: " + type);
}