You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/22 09:29:14 UTC

[spark] branch master updated: [SPARK-36156][SQL] SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value should be `\N`

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bb09bd2  [SPARK-36156][SQL] SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value should be `\N`
bb09bd2 is described below

commit bb09bd2e2dc1201df70b5ffbfaf7407c24b7fdd9
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Thu Jul 22 17:28:37 2021 +0800

    [SPARK-36156][SQL] SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value should be `\N`
    
    ### What changes were proposed in this pull request?
    SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value should be `\N`
    ![image](https://user-images.githubusercontent.com/46485123/125775377-611d4f06-f9e5-453a-990d-5a0018774f43.png)
    ![image](https://user-images.githubusercontent.com/46485123/125775387-6618bd0c-78d8-4457-bcc2-12dd70522946.png)
    
    ### Why are the changes needed?
    Keep consistence with Hive
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT
    
    Closes #33363 from AngersZhuuuu/SPARK-36156.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/BaseScriptTransformationExec.scala   | 30 +++++---
 .../test/resources/sql-tests/inputs/transform.sql  | 43 +++++++++++
 .../resources/sql-tests/results/transform.sql.out  | 88 ++++++++++++++++++++--
 3 files changed, 147 insertions(+), 14 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
index e249cd6..fc3a124 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
@@ -118,7 +118,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
       val processRowWithoutSerde = if (!ioschema.schemaLess) {
         prevLine: String =>
           new GenericInternalRow(
-            prevLine.split(outputRowFormat).padTo(outputFieldWriters.size, null)
+            prevLine.split(outputRowFormat, -1).padTo(outputFieldWriters.size, null)
               .zip(outputFieldWriters)
               .map { case (data, writer) => writer(data) })
       } else {
@@ -129,7 +129,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
         val kvWriter = CatalystTypeConverters.createToCatalystConverter(StringType)
         prevLine: String =>
           new GenericInternalRow(
-            prevLine.split(outputRowFormat).slice(0, 2).padTo(2, null)
+            prevLine.split(outputRowFormat, -1).slice(0, 2).padTo(2, null)
               .map(kvWriter))
       }
 
@@ -247,10 +247,14 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
   private val wrapperConvertException: (String => Any, Any => Any) => String => Any =
     (f: String => Any, converter: Any => Any) =>
       (data: String) => converter {
-        try {
-          f(data)
-        } catch {
-          case NonFatal(_) => null
+        if (data == ioschema.outputRowFormatMap("TOK_TABLEROWFORMATNULL")) {
+          null
+        } else {
+          try {
+            f(data)
+          } catch {
+            case NonFatal(_) => null
+          }
         }
       }
 }
@@ -282,11 +286,18 @@ abstract class BaseScriptTransformationWriterThread extends Thread with Logging
         ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")
       } else {
         val sb = new StringBuilder
-        sb.append(row.get(0, inputSchema(0)))
+        def appendToBuffer(s: AnyRef): Unit = {
+          if (s == null) {
+            sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATNULL"))
+          } else {
+            sb.append(s)
+          }
+        }
+        appendToBuffer(row.get(0, inputSchema(0)))
         var i = 1
         while (i < len) {
           sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
-          sb.append(row.get(i, inputSchema(i)))
+          appendToBuffer(row.get(i, inputSchema(i)))
           i += 1
         }
         sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
@@ -355,7 +366,8 @@ case class ScriptTransformationIOSchema(
 object ScriptTransformationIOSchema {
   val defaultFormat = Map(
     ("TOK_TABLEROWFORMATFIELD", "\u0001"),
-    ("TOK_TABLEROWFORMATLINES", "\n")
+    ("TOK_TABLEROWFORMATLINES", "\n"),
+    ("TOK_TABLEROWFORMATNULL" -> "\\N")
   )
 
   val defaultIOSchema = ScriptTransformationIOSchema(
diff --git a/sql/core/src/test/resources/sql-tests/inputs/transform.sql b/sql/core/src/test/resources/sql-tests/inputs/transform.sql
index d84659c..922a1d8 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/transform.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/transform.sql
@@ -121,6 +121,49 @@ USING 'cat' AS (d)
   NULL DEFINED AS 'NULL'
 FROM t;
 
+SELECT TRANSFORM(a, b, c, null)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+USING 'cat' AS (a, b, c, d)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+  NULL DEFINED AS 'NULL'
+FROM t;
+
+SELECT TRANSFORM(a, b, c, null)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+USING 'cat' AS (a, b, c, d)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+FROM t;
+
+SELECT TRANSFORM(a, b, c, null)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+  NULL DEFINED AS 'XXXX'
+USING 'cat' AS (a, b, c, d)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+FROM t;
+
+SELECT TRANSFORM(a, b, c, null)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+  NULL DEFINED AS '\n'
+USING 'cat' AS (a, b, c, d)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+FROM t;
+
 -- transform with defined row format delimit handle schema with correct type
 SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
   SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)
diff --git a/sql/core/src/test/resources/sql-tests/results/transform.sql.out b/sql/core/src/test/resources/sql-tests/results/transform.sql.out
index 6f94e74..c1c13cd 100644
--- a/sql/core/src/test/resources/sql-tests/results/transform.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/transform.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 47
+-- Number of queries: 51
 
 
 -- !query
@@ -202,9 +202,9 @@ FROM t
 -- !query schema
 struct<a:string,b:string,c:string,d:string>
 -- !query output
-1	true	Spark SQL	null
-2	false	Spark SQL	null
-3	true	Spark SQL	null
+1	true	Spark SQL	NULL
+2	false	Spark SQL	NULL
+3	true	Spark SQL	NULL
 
 
 -- !query
@@ -228,6 +228,84 @@ struct<d:string>
 
 
 -- !query
+SELECT TRANSFORM(a, b, c, null)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+USING 'cat' AS (a, b, c, d)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+  NULL DEFINED AS 'NULL'
+FROM t
+-- !query schema
+struct<a:string,b:string,c:string,d:string>
+-- !query output
+1	true	Spark SQL	\N
+2	false	Spark SQL	\N
+3	true	Spark SQL	\N
+
+
+-- !query
+SELECT TRANSFORM(a, b, c, null)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+USING 'cat' AS (a, b, c, d)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+FROM t
+-- !query schema
+struct<a:string,b:string,c:string,d:string>
+-- !query output
+1	true	Spark SQL	NULL
+2	false	Spark SQL	NULL
+3	true	Spark SQL	NULL
+
+
+-- !query
+SELECT TRANSFORM(a, b, c, null)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+  NULL DEFINED AS 'XXXX'
+USING 'cat' AS (a, b, c, d)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+FROM t
+-- !query schema
+struct<a:string,b:string,c:string,d:string>
+-- !query output
+1	true	Spark SQL	XXXX
+2	false	Spark SQL	XXXX
+3	true	Spark SQL	XXXX
+
+
+-- !query
+SELECT TRANSFORM(a, b, c, null)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+  NULL DEFINED AS '\n'
+USING 'cat' AS (a, b, c, d)
+  ROW FORMAT DELIMITED
+  FIELDS TERMINATED BY '@'
+  LINES TERMINATED BY '\n'
+FROM t
+-- !query schema
+struct<a:string,b:string,c:string,d:string>
+-- !query output
+	NULL	NULL	NULL
+	NULL	NULL	NULL
+	NULL	NULL	NULL
+1	true	Spark SQL	
+2	false	Spark SQL	
+3	true	Spark SQL
+
+
+-- !query
 SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
   SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)
     ROW FORMAT DELIMITED
@@ -455,7 +533,7 @@ GROUP BY b
 -- !query schema
 struct<a:string,b:string,c:string>
 -- !query output
-2	null	3
+2	NULL	3
 5	4	6
 
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org