You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/04/19 08:03:08 UTC
[spark] branch master updated: [SPARK-31937][SQL] Support
processing ArrayType/MapType/StructType data using no-serde mode script
transform
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a74f601 [SPARK-31937][SQL] Support processing ArrayType/MapType/StructType data using no-serde mode script transform
a74f601 is described below
commit a74f6010400eba9daca7a911627e95dd31acdc6c
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Apr 19 17:02:32 2021 +0900
[SPARK-31937][SQL] Support processing ArrayType/MapType/StructType data using no-serde mode script transform
### What changes were proposed in this pull request?
Support no-serde mode script transform use ArrayType/MapType/StructStpe data.
### Why are the changes needed?
Make user can process array/map/struct data
### Does this PR introduce _any_ user-facing change?
Yes, user can process array/map/struct data in script transform `no-serde` mode
### How was this patch tested?
Added UT
Closes #30957 from AngersZhuuuu/SPARK-31937.
Lead-authored-by: Angerszhuuuu <an...@gmail.com>
Co-authored-by: angerszhu <an...@gmail.com>
Co-authored-by: AngersZhuuuu <an...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
docs/sql-migration-guide.md | 2 +
.../execution/BaseScriptTransformationExec.scala | 16 +++++-
.../test/resources/sql-tests/inputs/transform.sql | 2 +-
.../resources/sql-tests/results/transform.sql.out | 10 ++--
.../execution/BaseScriptTransformationSuite.scala | 64 ++++++++++++++++++++--
.../execution/SparkScriptTransformationSuite.scala | 42 +-------------
6 files changed, 83 insertions(+), 53 deletions(-)
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 4f18ff3..e9cf49a 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -79,6 +79,8 @@ license: |
- In Spark 3.2, `TRANSFORM` operator can't support alias in inputs. In Spark 3.1 and earlier, we can write script transform like `SELECT TRANSFORM(a AS c1, b AS c2) USING 'cat' FROM TBL`.
+ - In Spark 3.2, `TRANSFORM` operator can support `ArrayType/MapType/StructType` without Hive SerDe, in this mode, we use `StructsToJosn` to convert `ArrayType/MapType/StructType` column to `STRING` and use `JsonToStructs` to parse `STRING` to `ArrayType/MapType/StructType`. In Spark 3.1, Spark just support case `ArrayType/MapType/StructType` column as `STRING` but can't support parse `STRING` to `ArrayType/MapType/StructType` output columns.
+
## Upgrading from Spark SQL 3.0 to 3.1
- In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggrega [...]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
index 669b90f..4d594f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkFiles, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, JsonToStructs, Literal, StructsToJson, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
@@ -47,7 +47,14 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
def ioschema: ScriptTransformationIOSchema
protected lazy val inputExpressionsWithoutSerde: Seq[Expression] = {
- input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone))
+ input.map { in =>
+ in.dataType match {
+ case _: ArrayType | _: MapType | _: StructType =>
+ new StructsToJson(ioschema.inputSerdeProps.toMap, in)
+ .withTimeZone(conf.sessionLocalTimeZone)
+ case _ => Cast(in, StringType).withTimeZone(conf.sessionLocalTimeZone)
+ }
+ }
}
override def producedAttributes: AttributeSet = outputSet -- inputSet
@@ -220,6 +227,11 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
case CalendarIntervalType => wrapperConvertException(
data => IntervalUtils.stringToInterval(UTF8String.fromString(data)),
converter)
+ case _: ArrayType | _: MapType | _: StructType =>
+ val complexTypeFactory = JsonToStructs(attr.dataType,
+ ioschema.outputSerdeProps.toMap, Literal(null), Some(conf.sessionLocalTimeZone))
+ wrapperConvertException(data =>
+ complexTypeFactory.nullSafeEval(UTF8String.fromString(data)), any => any)
case udt: UserDefinedType[_] =>
wrapperConvertException(data => udt.deserialize(data), converter)
case dt =>
diff --git a/sql/core/src/test/resources/sql-tests/inputs/transform.sql b/sql/core/src/test/resources/sql-tests/inputs/transform.sql
index e3ae34e..7419ca1 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/transform.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/transform.sql
@@ -264,7 +264,7 @@ WHERE a <= 4
WINDOW w AS (PARTITION BY b ORDER BY a);
SELECT TRANSFORM(b, MAX(a), CAST(SUM(c) AS STRING), myCol, myCol2)
- USING 'cat' AS (a, b, c, d, e)
+ USING 'cat' AS (a STRING, b STRING, c STRING, d ARRAY<INT>, e STRING)
FROM script_trans
LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol
LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2
diff --git a/sql/core/src/test/resources/sql-tests/results/transform.sql.out b/sql/core/src/test/resources/sql-tests/results/transform.sql.out
index c20ec4c..1d7e9cd 100644
--- a/sql/core/src/test/resources/sql-tests/results/transform.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/transform.sql.out
@@ -495,7 +495,7 @@ struct<a:string,b:string,c:string>
-- !query
SELECT TRANSFORM(b, MAX(a), CAST(SUM(c) AS STRING), myCol, myCol2)
- USING 'cat' AS (a, b, c, d, e)
+ USING 'cat' AS (a STRING, b STRING, c STRING, d ARRAY<INT>, e STRING)
FROM script_trans
LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol
LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2
@@ -503,11 +503,11 @@ WHERE a <= 4
GROUP BY b, myCol, myCol2
HAVING max(a) > 1
-- !query schema
-struct<a:string,b:string,c:string,d:string,e:string>
+struct<a:string,b:string,c:string,d:array<int>,e:string>
-- !query output
-5 4 6 [1, 2, 3] 1
-5 4 6 [1, 2, 3] 2
-5 4 6 [1, 2, 3] 3
+5 4 6 [1,2,3] 1
+5 4 6 [1,2,3] 2
+5 4 6 [1,2,3] 3
-- !query
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala
index 2011d05..67be671 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala
@@ -302,14 +302,16 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
script = "cat",
output = Seq(
AttributeReference("a", CalendarIntervalType)(),
- AttributeReference("b", StringType)(),
- AttributeReference("c", StringType)(),
- AttributeReference("d", StringType)(),
+ AttributeReference("b", ArrayType(IntegerType))(),
+ AttributeReference("c", MapType(StringType, IntegerType))(),
+ AttributeReference("d", StructType(
+ Array(StructField("_1", IntegerType),
+ StructField("_2", IntegerType))))(),
AttributeReference("e", new SimpleTupleUDT)()),
child = child,
ioschema = defaultIOSchema
),
- df.select('a, 'b.cast("string"), 'c.cast("string"), 'd.cast("string"), 'e).collect())
+ df.select('a, 'b, 'c, 'd, 'e).collect())
}
}
@@ -471,6 +473,60 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
}
}
+ test("SPARK-31936: Script transform support ArrayType/MapType/StructType (no serde)") {
+ assume(TestUtils.testCommandAvailable("python"))
+ withTempView("v") {
+ val df = Seq(
+ (Array(0, 1, 2), Array(Array(0, 1), Array(2)),
+ Map("a" -> 1), Map("b" -> Array("a", "b"))),
+ (Array(3, 4, 5), Array(Array(3, 4), Array(5)),
+ Map("b" -> 2), Map("c" -> Array("c", "d"))),
+ (Array(6, 7, 8), Array(Array(6, 7), Array(8)),
+ Map("c" -> 3), Map("d" -> Array("e", "f")))
+ ).toDF("a", "b", "c", "d")
+ .select('a, 'b, 'c, 'd,
+ struct('a, 'b).as("e"),
+ struct('a, 'd).as("f"),
+ struct(struct('a, 'b), struct('a, 'd)).as("g")
+ )
+
+ checkAnswer(
+ df,
+ (child: SparkPlan) => createScriptTransformationExec(
+ input = Seq(
+ df.col("a").expr,
+ df.col("b").expr,
+ df.col("c").expr,
+ df.col("d").expr,
+ df.col("e").expr,
+ df.col("f").expr,
+ df.col("g").expr),
+ script = "cat",
+ output = Seq(
+ AttributeReference("a", ArrayType(IntegerType))(),
+ AttributeReference("b", ArrayType(ArrayType(IntegerType)))(),
+ AttributeReference("c", MapType(StringType, IntegerType))(),
+ AttributeReference("d", MapType(StringType, ArrayType(StringType)))(),
+ AttributeReference("e", StructType(
+ Array(StructField("a", ArrayType(IntegerType)),
+ StructField("b", ArrayType(ArrayType(IntegerType))))))(),
+ AttributeReference("f", StructType(
+ Array(StructField("a", ArrayType(IntegerType)),
+ StructField("d", MapType(StringType, ArrayType(StringType))))))(),
+ AttributeReference("g", StructType(
+ Array(StructField("col1", StructType(
+ Array(StructField("a", ArrayType(IntegerType)),
+ StructField("b", ArrayType(ArrayType(IntegerType)))))),
+ StructField("col2", StructType(
+ Array(StructField("a", ArrayType(IntegerType)),
+ StructField("d", MapType(StringType, ArrayType(StringType)))))))))()),
+ child = child,
+ ioschema = defaultIOSchema
+ ),
+ df.select('a, 'b, 'c, 'd, 'e, 'f, 'g).collect())
+ }
+ }
+
test("SPARK-33934: Add SparkFile's root dir to env property PATH") {
assume(TestUtils.testCommandAvailable("python"))
val scriptFilePath = copyAndGetResourceFile("test_script.py", ".py").getAbsoluteFile
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala
index 6ff7c5d6..e5aa3bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
-import org.apache.spark.{SparkException, TestUtils}
+import org.apache.spark.TestUtils
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.test.SharedSparkSession
@@ -59,44 +59,4 @@ class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with
assert(e.contains("TRANSFORM with serde is only supported in hive mode"))
}
}
-
- test("SPARK-32106: TRANSFORM doesn't support ArrayType/MapType/StructType " +
- "as output data type (no serde)") {
- assume(TestUtils.testCommandAvailable("/bin/bash"))
- // check for ArrayType
- val e1 = intercept[SparkException] {
- sql(
- """
- |SELECT TRANSFORM(a)
- |USING 'cat' AS (a array<int>)
- |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
- """.stripMargin).collect()
- }.getMessage
- assert(e1.contains("SparkScriptTransformation without serde does not support" +
- " ArrayType as output data type"))
-
- // check for MapType
- val e2 = intercept[SparkException] {
- sql(
- """
- |SELECT TRANSFORM(b)
- |USING 'cat' AS (b map<int, string>)
- |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
- """.stripMargin).collect()
- }.getMessage
- assert(e2.contains("SparkScriptTransformation without serde does not support" +
- " MapType as output data type"))
-
- // check for StructType
- val e3 = intercept[SparkException] {
- sql(
- """
- |SELECT TRANSFORM(c)
- |USING 'cat' AS (c struct<col1:int, col2:string>)
- |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
- """.stripMargin).collect()
- }.getMessage
- assert(e3.contains("SparkScriptTransformation without serde does not support" +
- " StructType as output data type"))
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org