You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/22 09:29:14 UTC
[spark] branch master updated: [SPARK-36156][SQL] SCRIPT TRANSFORM
ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value
should be `\N`
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bb09bd2 [SPARK-36156][SQL] SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value should be `\N`
bb09bd2 is described below
commit bb09bd2e2dc1201df70b5ffbfaf7407c24b7fdd9
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Thu Jul 22 17:28:37 2021 +0800
[SPARK-36156][SQL] SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value should be `\N`
### What changes were proposed in this pull request?
SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value should be `\N`
![image](https://user-images.githubusercontent.com/46485123/125775377-611d4f06-f9e5-453a-990d-5a0018774f43.png)
![image](https://user-images.githubusercontent.com/46485123/125775387-6618bd0c-78d8-4457-bcc2-12dd70522946.png)
### Why are the changes needed?
Keep consistence with Hive
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT
Closes #33363 from AngersZhuuuu/SPARK-36156.
Authored-by: Angerszhuuuu <an...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../execution/BaseScriptTransformationExec.scala | 30 +++++---
.../test/resources/sql-tests/inputs/transform.sql | 43 +++++++++++
.../resources/sql-tests/results/transform.sql.out | 88 ++++++++++++++++++++--
3 files changed, 147 insertions(+), 14 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
index e249cd6..fc3a124 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
@@ -118,7 +118,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
val processRowWithoutSerde = if (!ioschema.schemaLess) {
prevLine: String =>
new GenericInternalRow(
- prevLine.split(outputRowFormat).padTo(outputFieldWriters.size, null)
+ prevLine.split(outputRowFormat, -1).padTo(outputFieldWriters.size, null)
.zip(outputFieldWriters)
.map { case (data, writer) => writer(data) })
} else {
@@ -129,7 +129,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
val kvWriter = CatalystTypeConverters.createToCatalystConverter(StringType)
prevLine: String =>
new GenericInternalRow(
- prevLine.split(outputRowFormat).slice(0, 2).padTo(2, null)
+ prevLine.split(outputRowFormat, -1).slice(0, 2).padTo(2, null)
.map(kvWriter))
}
@@ -247,10 +247,14 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
private val wrapperConvertException: (String => Any, Any => Any) => String => Any =
(f: String => Any, converter: Any => Any) =>
(data: String) => converter {
- try {
- f(data)
- } catch {
- case NonFatal(_) => null
+ if (data == ioschema.outputRowFormatMap("TOK_TABLEROWFORMATNULL")) {
+ null
+ } else {
+ try {
+ f(data)
+ } catch {
+ case NonFatal(_) => null
+ }
}
}
}
@@ -282,11 +286,18 @@ abstract class BaseScriptTransformationWriterThread extends Thread with Logging
ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")
} else {
val sb = new StringBuilder
- sb.append(row.get(0, inputSchema(0)))
+ def appendToBuffer(s: AnyRef): Unit = {
+ if (s == null) {
+ sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATNULL"))
+ } else {
+ sb.append(s)
+ }
+ }
+ appendToBuffer(row.get(0, inputSchema(0)))
var i = 1
while (i < len) {
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
- sb.append(row.get(i, inputSchema(i)))
+ appendToBuffer(row.get(i, inputSchema(i)))
i += 1
}
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
@@ -355,7 +366,8 @@ case class ScriptTransformationIOSchema(
object ScriptTransformationIOSchema {
val defaultFormat = Map(
("TOK_TABLEROWFORMATFIELD", "\u0001"),
- ("TOK_TABLEROWFORMATLINES", "\n")
+ ("TOK_TABLEROWFORMATLINES", "\n"),
+ ("TOK_TABLEROWFORMATNULL" -> "\\N")
)
val defaultIOSchema = ScriptTransformationIOSchema(
diff --git a/sql/core/src/test/resources/sql-tests/inputs/transform.sql b/sql/core/src/test/resources/sql-tests/inputs/transform.sql
index d84659c..922a1d8 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/transform.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/transform.sql
@@ -121,6 +121,49 @@ USING 'cat' AS (d)
NULL DEFINED AS 'NULL'
FROM t;
+SELECT TRANSFORM(a, b, c, null)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+USING 'cat' AS (a, b, c, d)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+ NULL DEFINED AS 'NULL'
+FROM t;
+
+SELECT TRANSFORM(a, b, c, null)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+USING 'cat' AS (a, b, c, d)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+FROM t;
+
+SELECT TRANSFORM(a, b, c, null)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+ NULL DEFINED AS 'XXXX'
+USING 'cat' AS (a, b, c, d)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+FROM t;
+
+SELECT TRANSFORM(a, b, c, null)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+ NULL DEFINED AS '\n'
+USING 'cat' AS (a, b, c, d)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+FROM t;
+
-- transform with defined row format delimit handle schema with correct type
SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)
diff --git a/sql/core/src/test/resources/sql-tests/results/transform.sql.out b/sql/core/src/test/resources/sql-tests/results/transform.sql.out
index 6f94e74..c1c13cd 100644
--- a/sql/core/src/test/resources/sql-tests/results/transform.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/transform.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 47
+-- Number of queries: 51
-- !query
@@ -202,9 +202,9 @@ FROM t
-- !query schema
struct<a:string,b:string,c:string,d:string>
-- !query output
-1 true Spark SQL null
-2 false Spark SQL null
-3 true Spark SQL null
+1 true Spark SQL NULL
+2 false Spark SQL NULL
+3 true Spark SQL NULL
-- !query
@@ -228,6 +228,84 @@ struct<d:string>
-- !query
+SELECT TRANSFORM(a, b, c, null)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+USING 'cat' AS (a, b, c, d)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+ NULL DEFINED AS 'NULL'
+FROM t
+-- !query schema
+struct<a:string,b:string,c:string,d:string>
+-- !query output
+1 true Spark SQL \N
+2 false Spark SQL \N
+3 true Spark SQL \N
+
+
+-- !query
+SELECT TRANSFORM(a, b, c, null)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+USING 'cat' AS (a, b, c, d)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+FROM t
+-- !query schema
+struct<a:string,b:string,c:string,d:string>
+-- !query output
+1 true Spark SQL NULL
+2 false Spark SQL NULL
+3 true Spark SQL NULL
+
+
+-- !query
+SELECT TRANSFORM(a, b, c, null)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+ NULL DEFINED AS 'XXXX'
+USING 'cat' AS (a, b, c, d)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+FROM t
+-- !query schema
+struct<a:string,b:string,c:string,d:string>
+-- !query output
+1 true Spark SQL XXXX
+2 false Spark SQL XXXX
+3 true Spark SQL XXXX
+
+
+-- !query
+SELECT TRANSFORM(a, b, c, null)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+ NULL DEFINED AS '\n'
+USING 'cat' AS (a, b, c, d)
+ ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '@'
+ LINES TERMINATED BY '\n'
+FROM t
+-- !query schema
+struct<a:string,b:string,c:string,d:string>
+-- !query output
+ NULL NULL NULL
+ NULL NULL NULL
+ NULL NULL NULL
+1 true Spark SQL
+2 false Spark SQL
+3 true Spark SQL
+
+
+-- !query
SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)
ROW FORMAT DELIMITED
@@ -455,7 +533,7 @@ GROUP BY b
-- !query schema
struct<a:string,b:string,c:string>
-- !query output
-2 null 3
+2 NULL 3
5 4 6
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org