You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/04/19 08:03:08 UTC

[spark] branch master updated: [SPARK-31937][SQL] Support processing ArrayType/MapType/StructType data using no-serde mode script transform

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a74f601  [SPARK-31937][SQL] Support processing ArrayType/MapType/StructType data using no-serde mode script transform
a74f601 is described below

commit a74f6010400eba9daca7a911627e95dd31acdc6c
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Apr 19 17:02:32 2021 +0900

    [SPARK-31937][SQL] Support processing ArrayType/MapType/StructType data using no-serde mode script transform
    
    ### What changes were proposed in this pull request?
    Support no-serde mode script transform use ArrayType/MapType/StructStpe data.
    
    ### Why are the changes needed?
    Make user can process array/map/struct data
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, user can process array/map/struct data in script transform `no-serde` mode
    
    ### How was this patch tested?
    Added UT
    
    Closes #30957 from AngersZhuuuu/SPARK-31937.
    
    Lead-authored-by: Angerszhuuuu <an...@gmail.com>
    Co-authored-by: angerszhu <an...@gmail.com>
    Co-authored-by: AngersZhuuuu <an...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 docs/sql-migration-guide.md                        |  2 +
 .../execution/BaseScriptTransformationExec.scala   | 16 +++++-
 .../test/resources/sql-tests/inputs/transform.sql  |  2 +-
 .../resources/sql-tests/results/transform.sql.out  | 10 ++--
 .../execution/BaseScriptTransformationSuite.scala  | 64 ++++++++++++++++++++--
 .../execution/SparkScriptTransformationSuite.scala | 42 +-------------
 6 files changed, 83 insertions(+), 53 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 4f18ff3..e9cf49a 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -79,6 +79,8 @@ license: |
 
   - In Spark 3.2, `TRANSFORM` operator can't support alias in inputs. In Spark 3.1 and earlier, we can write script transform like `SELECT TRANSFORM(a AS c1, b AS c2) USING 'cat' FROM TBL`.
 
+  - In Spark 3.2, `TRANSFORM` operator can support `ArrayType/MapType/StructType` without Hive SerDe, in this mode, we use `StructsToJosn` to convert `ArrayType/MapType/StructType` column to `STRING` and use `JsonToStructs` to parse `STRING` to `ArrayType/MapType/StructType`. In Spark 3.1, Spark just support case `ArrayType/MapType/StructType` column as `STRING` but can't support parse `STRING` to `ArrayType/MapType/StructType` output columns.
+
 ## Upgrading from Spark SQL 3.0 to 3.1
 
   - In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggrega [...]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
index 669b90f..4d594f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkFiles, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, JsonToStructs, Literal, StructsToJson, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
@@ -47,7 +47,14 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
   def ioschema: ScriptTransformationIOSchema
 
   protected lazy val inputExpressionsWithoutSerde: Seq[Expression] = {
-    input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone))
+    input.map { in =>
+      in.dataType match {
+        case _: ArrayType | _: MapType | _: StructType =>
+          new StructsToJson(ioschema.inputSerdeProps.toMap, in)
+            .withTimeZone(conf.sessionLocalTimeZone)
+        case _ => Cast(in, StringType).withTimeZone(conf.sessionLocalTimeZone)
+      }
+    }
   }
 
   override def producedAttributes: AttributeSet = outputSet -- inputSet
@@ -220,6 +227,11 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
       case CalendarIntervalType => wrapperConvertException(
         data => IntervalUtils.stringToInterval(UTF8String.fromString(data)),
         converter)
+      case _: ArrayType | _: MapType | _: StructType =>
+        val complexTypeFactory = JsonToStructs(attr.dataType,
+          ioschema.outputSerdeProps.toMap, Literal(null), Some(conf.sessionLocalTimeZone))
+        wrapperConvertException(data =>
+          complexTypeFactory.nullSafeEval(UTF8String.fromString(data)), any => any)
       case udt: UserDefinedType[_] =>
         wrapperConvertException(data => udt.deserialize(data), converter)
       case dt =>
diff --git a/sql/core/src/test/resources/sql-tests/inputs/transform.sql b/sql/core/src/test/resources/sql-tests/inputs/transform.sql
index e3ae34e..7419ca1 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/transform.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/transform.sql
@@ -264,7 +264,7 @@ WHERE a <= 4
 WINDOW w AS (PARTITION BY b ORDER BY a);
 
 SELECT TRANSFORM(b, MAX(a), CAST(SUM(c) AS STRING), myCol, myCol2)
-  USING 'cat' AS (a, b, c, d, e)
+  USING 'cat' AS (a STRING, b STRING, c STRING, d ARRAY<INT>, e STRING)
 FROM script_trans
 LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol
 LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2
diff --git a/sql/core/src/test/resources/sql-tests/results/transform.sql.out b/sql/core/src/test/resources/sql-tests/results/transform.sql.out
index c20ec4c..1d7e9cd 100644
--- a/sql/core/src/test/resources/sql-tests/results/transform.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/transform.sql.out
@@ -495,7 +495,7 @@ struct<a:string,b:string,c:string>
 
 -- !query
 SELECT TRANSFORM(b, MAX(a), CAST(SUM(c) AS STRING), myCol, myCol2)
-  USING 'cat' AS (a, b, c, d, e)
+  USING 'cat' AS (a STRING, b STRING, c STRING, d ARRAY<INT>, e STRING)
 FROM script_trans
 LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol
 LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2
@@ -503,11 +503,11 @@ WHERE a <= 4
 GROUP BY b, myCol, myCol2
 HAVING max(a) > 1
 -- !query schema
-struct<a:string,b:string,c:string,d:string,e:string>
+struct<a:string,b:string,c:string,d:array<int>,e:string>
 -- !query output
-5	4	6	[1, 2, 3]	1
-5	4	6	[1, 2, 3]	2
-5	4	6	[1, 2, 3]	3
+5	4	6	[1,2,3]	1
+5	4	6	[1,2,3]	2
+5	4	6	[1,2,3]	3
 
 
 -- !query
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala
index 2011d05..67be671 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala
@@ -302,14 +302,16 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
           script = "cat",
           output = Seq(
             AttributeReference("a", CalendarIntervalType)(),
-            AttributeReference("b", StringType)(),
-            AttributeReference("c", StringType)(),
-            AttributeReference("d", StringType)(),
+            AttributeReference("b", ArrayType(IntegerType))(),
+            AttributeReference("c", MapType(StringType, IntegerType))(),
+            AttributeReference("d", StructType(
+              Array(StructField("_1", IntegerType),
+                StructField("_2", IntegerType))))(),
             AttributeReference("e", new SimpleTupleUDT)()),
           child = child,
           ioschema = defaultIOSchema
         ),
-        df.select('a, 'b.cast("string"), 'c.cast("string"), 'd.cast("string"), 'e).collect())
+        df.select('a, 'b, 'c, 'd, 'e).collect())
     }
   }
 
@@ -471,6 +473,60 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
     }
   }
 
+  test("SPARK-31936: Script transform support ArrayType/MapType/StructType (no serde)") {
+    assume(TestUtils.testCommandAvailable("python"))
+    withTempView("v") {
+      val df = Seq(
+        (Array(0, 1, 2), Array(Array(0, 1), Array(2)),
+          Map("a" -> 1), Map("b" -> Array("a", "b"))),
+        (Array(3, 4, 5), Array(Array(3, 4), Array(5)),
+          Map("b" -> 2), Map("c" -> Array("c", "d"))),
+        (Array(6, 7, 8), Array(Array(6, 7), Array(8)),
+          Map("c" -> 3), Map("d" -> Array("e", "f")))
+      ).toDF("a", "b", "c", "d")
+        .select('a, 'b, 'c, 'd,
+          struct('a, 'b).as("e"),
+          struct('a, 'd).as("f"),
+          struct(struct('a, 'b), struct('a, 'd)).as("g")
+        )
+
+      checkAnswer(
+        df,
+        (child: SparkPlan) => createScriptTransformationExec(
+          input = Seq(
+            df.col("a").expr,
+            df.col("b").expr,
+            df.col("c").expr,
+            df.col("d").expr,
+            df.col("e").expr,
+            df.col("f").expr,
+            df.col("g").expr),
+          script = "cat",
+          output = Seq(
+            AttributeReference("a", ArrayType(IntegerType))(),
+            AttributeReference("b", ArrayType(ArrayType(IntegerType)))(),
+            AttributeReference("c", MapType(StringType, IntegerType))(),
+            AttributeReference("d", MapType(StringType, ArrayType(StringType)))(),
+            AttributeReference("e", StructType(
+              Array(StructField("a", ArrayType(IntegerType)),
+                StructField("b", ArrayType(ArrayType(IntegerType))))))(),
+            AttributeReference("f", StructType(
+              Array(StructField("a", ArrayType(IntegerType)),
+                StructField("d", MapType(StringType, ArrayType(StringType))))))(),
+            AttributeReference("g", StructType(
+              Array(StructField("col1", StructType(
+                Array(StructField("a", ArrayType(IntegerType)),
+                  StructField("b", ArrayType(ArrayType(IntegerType)))))),
+                StructField("col2", StructType(
+                  Array(StructField("a", ArrayType(IntegerType)),
+                    StructField("d", MapType(StringType, ArrayType(StringType)))))))))()),
+          child = child,
+          ioschema = defaultIOSchema
+        ),
+        df.select('a, 'b, 'c, 'd, 'e, 'f, 'g).collect())
+    }
+  }
+
   test("SPARK-33934: Add SparkFile's root dir to env property PATH") {
     assume(TestUtils.testCommandAvailable("python"))
     val scriptFilePath = copyAndGetResourceFile("test_script.py", ".py").getAbsoluteFile
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala
index 6ff7c5d6..e5aa3bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.{SparkException, TestUtils}
+import org.apache.spark.TestUtils
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.test.SharedSparkSession
@@ -59,44 +59,4 @@ class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with
       assert(e.contains("TRANSFORM with serde is only supported in hive mode"))
     }
   }
-
-  test("SPARK-32106: TRANSFORM doesn't support ArrayType/MapType/StructType " +
-    "as output data type (no serde)") {
-    assume(TestUtils.testCommandAvailable("/bin/bash"))
-    // check for ArrayType
-    val e1 = intercept[SparkException] {
-      sql(
-        """
-          |SELECT TRANSFORM(a)
-          |USING 'cat' AS (a array<int>)
-          |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
-        """.stripMargin).collect()
-    }.getMessage
-    assert(e1.contains("SparkScriptTransformation without serde does not support" +
-      " ArrayType as output data type"))
-
-    // check for MapType
-    val e2 = intercept[SparkException] {
-      sql(
-        """
-          |SELECT TRANSFORM(b)
-          |USING 'cat' AS (b map<int, string>)
-          |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
-        """.stripMargin).collect()
-    }.getMessage
-    assert(e2.contains("SparkScriptTransformation without serde does not support" +
-      " MapType as output data type"))
-
-    // check for StructType
-    val e3 = intercept[SparkException] {
-      sql(
-        """
-          |SELECT TRANSFORM(c)
-          |USING 'cat' AS (c struct<col1:int, col2:string>)
-          |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
-        """.stripMargin).collect()
-    }.getMessage
-    assert(e3.contains("SparkScriptTransformation without serde does not support" +
-      " StructType as output data type"))
-  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org