You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/08/20 13:46:23 UTC

[spark] branch branch-3.0 updated: [SPARK-32608][SQL][3.0] Script Transform ROW FORMAT DELIMIT value should format value

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 87d7ab6  [SPARK-32608][SQL][3.0] Script Transform ROW FORMAT DELIMIT value should format value
87d7ab6 is described below

commit 87d7ab6c6e96db2bd019d743d9459d6f00240829
Author: angerszhu <an...@gmail.com>
AuthorDate: Thu Aug 20 13:43:15 2020 +0000

    [SPARK-32608][SQL][3.0] Script Transform ROW FORMAT DELIMIT value should format value
    
    ### What changes were proposed in this pull request?
    For SQL
    ```
    SELECT TRANSFORM(a, b, c)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      NULL DEFINED AS 'null'
      USING 'cat' AS (a, b, c)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      NULL DEFINED AS 'NULL'
    FROM testData
    ```
    The correct
    
    TOK_TABLEROWFORMATFIELD should be `, `nut actually ` ','`
    
    TOK_TABLEROWFORMATLINES should be `\n`  but actually` '\n'`
    
    ### Why are the changes needed?
    Fix string value format
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT
    
    Closes #29487 from AngersZhuuuu/SPARK-32608-3.0.
    
    Authored-by: angerszhu <an...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/parser/ParserUtils.scala    |  5 ++
 .../spark/sql/execution/SparkSqlParser.scala       |  7 --
 .../spark/sql/execution/SparkSqlParserSuite.scala  | 45 ++++++++++++-
 .../hive/execution/ScriptTransformationSuite.scala | 77 ++++++++++++++++++++--
 4 files changed, 118 insertions(+), 16 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
index a377969..f2dab94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
@@ -83,6 +83,11 @@ object ParserUtils {
     node.getText.slice(1, node.getText.size - 1)
   }
 
+  /** Collect the entries if any. */
+  def entry(key: String, value: Token): Seq[(String, String)] = {
+    Option(value).toSeq.map(x => key -> string(x))
+  }
+
   /** Get the origin (line and position) of the token. */
   def position(token: Token): Origin = {
     val opt = Option(token)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index aa139cb..44069f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -587,10 +587,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
    */
   override def visitRowFormatDelimited(
       ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) {
-    // Collect the entries if any.
-    def entry(key: String, value: Token): Seq[(String, String)] = {
-      Option(value).toSeq.map(x => key -> string(x))
-    }
     // TODO we need proper support for the NULL format.
     val entries =
       entry("field.delim", ctx.fieldsTerminatedBy) ++
@@ -689,9 +685,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
         // expects a seq of pairs in which the old parsers' token names are used as keys.
         // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
         // retrieving the key value pairs ourselves.
-        def entry(key: String, value: Token): Seq[(String, String)] = {
-          Option(value).map(t => key -> t.getText).toSeq
-        }
         val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
           entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
           entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 06574a9..6a58c8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -21,8 +21,9 @@ import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Concat, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, RepartitionByExpression, Sort}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Concat, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{CreateTable, RefreshResource}
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -251,4 +252,44 @@ class SparkSqlParserSuite extends AnalysisTest {
     assertEqual("ADD FILE /path with space/abc.txt", AddFileCommand("/path with space/abc.txt"))
     assertEqual("ADD JAR /path with space/abc.jar", AddJarCommand("/path with space/abc.jar"))
   }
+
+  test("SPARK-32608: script transform with row format delimit") {
+    assertEqual(
+      """
+        |SELECT TRANSFORM(a, b, c)
+        |  ROW FORMAT DELIMITED
+        |  FIELDS TERMINATED BY ','
+        |  COLLECTION ITEMS TERMINATED BY '#'
+        |  MAP KEYS TERMINATED BY '@'
+        |  LINES TERMINATED BY '\n'
+        |  NULL DEFINED AS 'null'
+        |  USING 'cat' AS (a, b, c)
+        |  ROW FORMAT DELIMITED
+        |  FIELDS TERMINATED BY ','
+        |  COLLECTION ITEMS TERMINATED BY '#'
+        |  MAP KEYS TERMINATED BY '@'
+        |  LINES TERMINATED BY '\n'
+        |  NULL DEFINED AS 'NULL'
+        |FROM testData
+      """.stripMargin,
+      ScriptTransformation(
+        Seq('a, 'b, 'c),
+        "cat",
+        Seq(AttributeReference("a", StringType)(),
+          AttributeReference("b", StringType)(),
+          AttributeReference("c", StringType)()),
+        UnresolvedRelation(TableIdentifier("testData")),
+        ScriptInputOutputSchema(
+          Seq(("TOK_TABLEROWFORMATFIELD", ","),
+            ("TOK_TABLEROWFORMATCOLLITEMS", "#"),
+            ("TOK_TABLEROWFORMATMAPKEYS", "@"),
+            ("TOK_TABLEROWFORMATLINES", "\n"),
+            ("TOK_TABLEROWFORMATNULL", "null")),
+          Seq(("TOK_TABLEROWFORMATFIELD", ","),
+            ("TOK_TABLEROWFORMATCOLLITEMS", "#"),
+            ("TOK_TABLEROWFORMATMAPKEYS", "@"),
+            ("TOK_TABLEROWFORMATLINES", "\n"),
+            ("TOK_TABLEROWFORMATNULL", "NULL")), None, None,
+          List.empty, List.empty, None, None, false)))
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index b97eb86..15a932f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -61,6 +62,14 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes
 
   private val uncaughtExceptionHandler = new TestUncaughtExceptionHandler
 
+  // In Hive 1.2, the string representation of a decimal omits trailing zeroes.
+  // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary.
+  val decimalToString: Column => Column = if (HiveUtils.isHive23) {
+    c => c.cast("string")
+  } else {
+    c => c.cast("decimal(1, 0)").cast("string")
+  }
+
   protected override def beforeAll(): Unit = {
     super.beforeAll()
     defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler
@@ -212,13 +221,6 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes
           |FROM v
         """.stripMargin)
 
-      // In Hive 1.2, the string representation of a decimal omits trailing zeroes.
-      // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary.
-      val decimalToString: Column => Column = if (HiveUtils.isHive23) {
-        c => c.cast("string")
-      } else {
-        c => c.cast("decimal(1, 0)").cast("string")
-      }
       checkAnswer(query, identity, df.select(
         'a.cast("string"),
         'b.cast("string"),
@@ -263,6 +265,67 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes
     assert(e.getMessage.contains("Subprocess exited with status"))
     assert(uncaughtExceptionHandler.exception.isEmpty)
   }
+
+
+  test("SPARK-32608: Script Transform ROW FORMAT DELIMIT value should format value") {
+    withTempView("v") {
+      val df = Seq(
+        (1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)),
+        (2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)),
+        (3, "3", 3.0, BigDecimal(3.0), new Timestamp(3))
+      ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18)
+      df.createTempView("v")
+
+      // input/output with same delimit
+      checkAnswer(
+        sql(
+          s"""
+             |SELECT TRANSFORM(a, b, c, d, cast(e as string))
+             |  ROW FORMAT DELIMITED
+             |  FIELDS TERMINATED BY ','
+             |  COLLECTION ITEMS TERMINATED BY '#'
+             |  MAP KEYS TERMINATED BY '@'
+             |  LINES TERMINATED BY '\n'
+             |  NULL DEFINED AS 'null'
+             |  USING 'cat' AS (a, b, c, d, e)
+             |  ROW FORMAT DELIMITED
+             |  FIELDS TERMINATED BY ','
+             |  COLLECTION ITEMS TERMINATED BY '#'
+             |  MAP KEYS TERMINATED BY '@'
+             |  LINES TERMINATED BY '\n'
+             |  NULL DEFINED AS 'NULL'
+             |FROM v
+        """.stripMargin), identity, df.select(
+          'a.cast("string"),
+          'b.cast("string"),
+          'c.cast("string"),
+          decimalToString('d),
+          'e.cast("string")).collect())
+
+      // input/output with different delimit and show result
+      checkAnswer(
+        sql(
+          s"""
+             |SELECT TRANSFORM(a, b, c, d, cast(e as string))
+             |  ROW FORMAT DELIMITED
+             |  FIELDS TERMINATED BY ','
+             |  LINES TERMINATED BY '\n'
+             |  NULL DEFINED AS 'null'
+             |  USING 'cat' AS (value)
+             |  ROW FORMAT DELIMITED
+             |  FIELDS TERMINATED BY '&'
+             |  LINES TERMINATED BY '\n'
+             |  NULL DEFINED AS 'NULL'
+             |FROM v
+        """.stripMargin), identity, df.select(
+          concat_ws(",",
+            'a.cast("string"),
+            'b.cast("string"),
+            'c.cast("string"),
+            decimalToString('d),
+            'e.cast("string"))).collect())
+    }
+  }
 }
 
 private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org