You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/07/08 01:59:47 UTC
[spark] branch master updated: [SPARK-20680][SQL] Spark-sql do not
support for creating table with void column datatype
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b5297c4 [SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype
b5297c4 is described below
commit b5297c43b0bd5a62a20fb047fdee24ebd63f939d
Author: LantaoJin <ji...@gmail.com>
AuthorDate: Tue Jul 7 18:58:01 2020 -0700
[SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype
### What changes were proposed in this pull request?
This is the new PR which to address the close one #17953
1. support "void" primitive data type in the `AstBuilder`, point it to `NullType`
2. forbid creating tables with VOID/NULL column type
### Why are the changes needed?
1. Spark is incompatible with hive void type. When Hive table schema contains void type, DESC table will throw an exception in Spark.
>hive> create table bad as select 1 x, null z from dual;
>hive> describe bad;
OK
x int
z void
In Spark2.0.x, the behaviour to read this view is normal:
>spark-sql> describe bad;
x int NULL
z void NULL
Time taken: 4.431 seconds, Fetched 2 row(s)
But in lastest Spark version, it failed with SparkException: Cannot recognize hive type string: void
>spark-sql> describe bad;
17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad]
org.apache.spark.SparkException: Cannot recognize hive type string: void
Caused by: org.apache.spark.sql.catalyst.parser.ParseException:
DataType void() is not supported.(line 1, pos 0)
== SQL ==
void
^^^
... 61 more
org.apache.spark.SparkException: Cannot recognize hive type string: void
2. Hive CTAS statements throws error when select clause has NULL/VOID type column since HIVE-11217
In Spark, creating table with a VOID/NULL column should throw readable exception message, include
- create data source table (using parquet, json, ...)
- create hive table (with or without stored as)
- CTAS
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Add unit tests
Closes #28833 from LantaoJin/SPARK-20680_COPY.
Authored-by: LantaoJin <ji...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
python/pyspark/sql/types.py | 3 +
.../sql/catalyst/analysis/ResolveCatalogs.scala | 11 ++
.../spark/sql/catalyst/parser/AstBuilder.scala | 1 +
.../sql/connector/catalog/CatalogV2Util.scala | 21 +++-
.../org/apache/spark/sql/types/NullType.scala | 4 +
.../sql/catalyst/parser/DataTypeParserSuite.scala | 1 +
.../catalyst/analysis/ResolveSessionCatalog.scala | 11 ++
.../spark/sql/execution/datasources/rules.scala | 3 +
.../sql-functions/sql-expression-schema.md | 2 +-
.../sql-tests/results/ansi/literals.sql.out | 2 +-
.../sql-tests/results/inline-table.sql.out | 2 +-
.../resources/sql-tests/results/literals.sql.out | 2 +-
.../sql-tests/results/misc-functions.sql.out | 2 +-
.../sql-tests/results/postgreSQL/select.sql.out | 4 +-
.../results/sql-compatibility-functions.sql.out | 6 +-
.../sql-tests/results/udf/udf-inline-table.sql.out | 2 +-
.../spark/sql/FileBasedDataSourceSuite.scala | 2 +-
.../org/apache/spark/sql/hive/HiveStrategies.scala | 3 +
.../spark/sql/hive/execution/HiveDDLSuite.scala | 121 +++++++++++++++++++++
.../spark/sql/hive/orc/HiveOrcSourceSuite.scala | 2 +-
20 files changed, 191 insertions(+), 14 deletions(-)
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 320a68d..ddd13ca3 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -116,6 +116,9 @@ class NullType(DataType):
__metaclass__ = DataTypeSingleton
+ def simpleString(self):
+ return 'unknown'
+
class AtomicType(DataType):
"""An internal type used to represent everything that is not
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 2a0a944..a406040 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -34,6 +34,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
+ cols.foreach(c => failNullType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
@@ -47,6 +48,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case AlterTableReplaceColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
+ cols.foreach(c => failNullType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(table) =>
@@ -69,6 +71,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
+ a.dataType.foreach(failNullType)
a.dataType.foreach(failCharType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
@@ -145,6 +148,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+ assertNoNullTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
@@ -157,6 +161,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ CreateTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+ if (c.asSelect.resolved) {
+ assertNoNullTypeInSchema(c.asSelect.schema)
+ }
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
@@ -172,6 +179,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+ assertNoNullTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
@@ -184,6 +192,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ ReplaceTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+ if (c.asSelect.resolved) {
+ assertNoNullTypeInSchema(c.asSelect.schema)
+ }
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d08bcb1..6b41a8b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2203,6 +2203,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
DecimalType(precision.getText.toInt, 0)
case ("decimal" | "dec" | "numeric", precision :: scale :: Nil) =>
DecimalType(precision.getText.toInt, scale.getText.toInt)
+ case ("void", Nil) => NullType
case ("interval", Nil) => CalendarIntervalType
case (dt, params) =>
val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e1f3293..d130a13 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.AlterTable
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, NullType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
@@ -346,4 +346,23 @@ private[sql] object CatalogV2Util {
}
}
}
+
+ def failNullType(dt: DataType): Unit = {
+ def containsNullType(dt: DataType): Boolean = dt match {
+ case ArrayType(et, _) => containsNullType(et)
+ case MapType(kt, vt, _) => containsNullType(kt) || containsNullType(vt)
+ case StructType(fields) => fields.exists(f => containsNullType(f.dataType))
+ case _ => dt.isInstanceOf[NullType]
+ }
+ if (containsNullType(dt)) {
+ throw new AnalysisException(
+ "Cannot create tables with unknown type.")
+ }
+ }
+
+ def assertNoNullTypeInSchema(schema: StructType): Unit = {
+ schema.foreach { f =>
+ failNullType(f.dataType)
+ }
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
index 14097a5..6c9a1d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
@@ -32,6 +32,10 @@ class NullType private() extends DataType {
override def defaultSize: Int = 1
private[spark] override def asNullable: NullType = this
+
+ // "null" is mainly used to represent a literal in Spark,
+ // it's better to avoid using it for data types.
+ override def simpleString: String = "unknown"
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
index d519fdf..655b1d2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
@@ -61,6 +61,7 @@ class DataTypeParserSuite extends SparkFunSuite {
checkDataType("varchAr(20)", StringType)
checkDataType("cHaR(27)", StringType)
checkDataType("BINARY", BinaryType)
+ checkDataType("void", NullType)
checkDataType("interval", CalendarIntervalType)
checkDataType("array<doublE>", ArrayType(DoubleType, true))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index bf90875..bc3f38a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -48,6 +48,7 @@ class ResolveSessionCatalog(
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case AlterTableAddColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
+ cols.foreach(c => failNullType(c.dataType))
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
@@ -76,6 +77,7 @@ class ResolveSessionCatalog(
case AlterTableReplaceColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
+ cols.foreach(c => failNullType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(_: V1Table) =>
throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
@@ -100,6 +102,7 @@ class ResolveSessionCatalog(
case a @ AlterTableAlterColumnStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
+ a.dataType.foreach(failNullType)
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
@@ -268,6 +271,7 @@ class ResolveSessionCatalog(
// session catalog and the table provider is not v2.
case c @ CreateTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+ assertNoNullTypeInSchema(c.tableSchema)
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
if (!DDLUtils.isHiveTable(Some(provider))) {
@@ -292,6 +296,9 @@ class ResolveSessionCatalog(
case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+ if (c.asSelect.resolved) {
+ assertNoNullTypeInSchema(c.asSelect.schema)
+ }
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
@@ -319,6 +326,7 @@ class ResolveSessionCatalog(
// session catalog and the table provider is not v2.
case c @ ReplaceTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+ assertNoNullTypeInSchema(c.tableSchema)
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
@@ -336,6 +344,9 @@ class ResolveSessionCatalog(
case c @ ReplaceTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+ if (c.asSelect.resolved) {
+ assertNoNullTypeInSchema(c.asSelect.schema)
+ }
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 95343e2..60cacda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
@@ -292,6 +293,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
"in the table definition of " + table.identifier,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ assertNoNullTypeInSchema(schema)
+
val normalizedPartCols = normalizePartitionColumns(schema, table)
val normalizedBucketSpec = normalizeBucketSpec(schema, table)
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 8898a11..c39adac 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -34,7 +34,7 @@
| org.apache.spark.sql.catalyst.expressions.Ascii | ascii | SELECT ascii('222') | struct<ascii(222):int> |
| org.apache.spark.sql.catalyst.expressions.Asin | asin | SELECT asin(0) | struct<ASIN(CAST(0 AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.Asinh | asinh | SELECT asinh(0) | struct<ASINH(CAST(0 AS DOUBLE)):double> |
-| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):null> |
+| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):unknown> |
| org.apache.spark.sql.catalyst.expressions.Atan | atan | SELECT atan(0) | struct<ATAN(CAST(0 AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.Atan2 | atan2 | SELECT atan2(0, 0) | struct<ATAN2(CAST(0 AS DOUBLE), CAST(0 AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.Atanh | atanh | SELECT atanh(0) | struct<ATANH(CAST(0 AS DOUBLE)):double> |
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
index f6720f6..0274771 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
@@ -5,7 +5,7 @@
-- !query
select null, Null, nUll
-- !query schema
-struct<NULL:null,NULL:null,NULL:null>
+struct<NULL:unknown,NULL:unknown,NULL:unknown>
-- !query output
NULL NULL NULL
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
index 9943b93..2dd6960 100644
--- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
@@ -49,7 +49,7 @@ two 2
-- !query
select * from values ("one", null), ("two", null) as data(a, b)
-- !query schema
-struct<a:string,b:null>
+struct<a:string,b:unknown>
-- !query output
one NULL
two NULL
diff --git a/sql/core/src/test/resources/sql-tests/results/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/literals.sql.out
index f6720f6..0274771 100644
--- a/sql/core/src/test/resources/sql-tests/results/literals.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/literals.sql.out
@@ -5,7 +5,7 @@
-- !query
select null, Null, nUll
-- !query schema
-struct<NULL:null,NULL:null,NULL:null>
+struct<NULL:unknown,NULL:unknown,NULL:unknown>
-- !query output
NULL NULL NULL
diff --git a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out
index bd8ffb8..8d34bf2 100644
--- a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out
@@ -7,7 +7,7 @@ select typeof(null)
-- !query schema
struct<typeof(NULL):string>
-- !query output
-null
+unknown
-- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out
index 1e59036..8b32bd6 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out
@@ -308,7 +308,7 @@ struct<1:int>
-- !query
select foo.* from (select null) as foo
-- !query schema
-struct<NULL:null>
+struct<NULL:unknown>
-- !query output
NULL
@@ -316,7 +316,7 @@ NULL
-- !query
select foo.* from (select 'xyzzy',1,null) as foo
-- !query schema
-struct<xyzzy:string,1:int,NULL:null>
+struct<xyzzy:string,1:int,NULL:unknown>
-- !query output
xyzzy 1 NULL
diff --git a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
index 26a44a8..b905f9e 100644
--- a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
@@ -5,7 +5,7 @@
-- !query
SELECT ifnull(null, 'x'), ifnull('y', 'x'), ifnull(null, null)
-- !query schema
-struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):null>
+struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):unknown>
-- !query output
x y NULL
@@ -21,7 +21,7 @@ NULL x
-- !query
SELECT nvl(null, 'x'), nvl('y', 'x'), nvl(null, null)
-- !query schema
-struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):null>
+struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):unknown>
-- !query output
x y NULL
@@ -29,7 +29,7 @@ x y NULL
-- !query
SELECT nvl2(null, 'x', 'y'), nvl2('n', 'x', 'y'), nvl2(null, null, null)
-- !query schema
-struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):null>
+struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):unknown>
-- !query output
y x NULL
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
index d78d347..0680a87 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
@@ -49,7 +49,7 @@ two 2
-- !query
select udf(a), b from values ("one", null), ("two", null) as data(a, b)
-- !query schema
-struct<CAST(udf(cast(a as string)) AS STRING):string,b:null>
+struct<CAST(udf(cast(a as string)) AS STRING):string,b:unknown>
-- !query output
one NULL
two NULL
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 231a8f2..daa262d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -405,7 +405,7 @@ class FileBasedDataSourceSuite extends QueryTest
""
}
def errorMessage(format: String): String = {
- s"$format data source does not support null data type."
+ s"$format data source does not support unknown data type."
}
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
withTempDir { dir =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b9c98f4..2b1eb05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.CreateTable
@@ -225,6 +226,8 @@ case class RelationConversions(
isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
// validation is required to be done here before relation conversion.
DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
+ // This is for CREATE TABLE .. STORED AS PARQUET/ORC AS SELECT null
+ assertNoNullTypeInSchema(query.schema)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e8cf4ad..774fb5b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.connector.FakeV2Provider
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
@@ -2309,6 +2310,126 @@ class HiveDDLSuite
}
}
+ test("SPARK-20680: Spark-sql do not support for unknown column datatype") {
+ withTable("t") {
+ withView("tabUnknownType") {
+ hiveClient.runSqlHive("CREATE TABLE t (t1 int)")
+ hiveClient.runSqlHive("INSERT INTO t VALUES (3)")
+ hiveClient.runSqlHive("CREATE VIEW tabUnknownType AS SELECT NULL AS col FROM t")
+ checkAnswer(spark.table("tabUnknownType"), Row(null))
+ // No exception shows
+ val desc = spark.sql("DESC tabUnknownType").collect().toSeq
+ assert(desc.contains(Row("col", NullType.simpleString, null)))
+ }
+ }
+
+ // Forbid CTAS with unknown type
+ withTable("t1", "t2", "t3") {
+ val e1 = intercept[AnalysisException] {
+ spark.sql("CREATE TABLE t1 USING PARQUET AS SELECT null as null_col")
+ }.getMessage
+ assert(e1.contains("Cannot create tables with unknown type"))
+
+ val e2 = intercept[AnalysisException] {
+ spark.sql("CREATE TABLE t2 AS SELECT null as null_col")
+ }.getMessage
+ assert(e2.contains("Cannot create tables with unknown type"))
+
+ val e3 = intercept[AnalysisException] {
+ spark.sql("CREATE TABLE t3 STORED AS PARQUET AS SELECT null as null_col")
+ }.getMessage
+ assert(e3.contains("Cannot create tables with unknown type"))
+ }
+
+ // Forbid Replace table AS SELECT with unknown type
+ withTable("t") {
+ val v2Source = classOf[FakeV2Provider].getName
+ val e = intercept[AnalysisException] {
+ spark.sql(s"CREATE OR REPLACE TABLE t USING $v2Source AS SELECT null as null_col")
+ }.getMessage
+ assert(e.contains("Cannot create tables with unknown type"))
+ }
+
+ // Forbid creating table with VOID type in Spark
+ withTable("t1", "t2", "t3", "t4") {
+ val e1 = intercept[AnalysisException] {
+ spark.sql(s"CREATE TABLE t1 (v VOID) USING PARQUET")
+ }.getMessage
+ assert(e1.contains("Cannot create tables with unknown type"))
+ val e2 = intercept[AnalysisException] {
+ spark.sql(s"CREATE TABLE t2 (v VOID) USING hive")
+ }.getMessage
+ assert(e2.contains("Cannot create tables with unknown type"))
+ val e3 = intercept[AnalysisException] {
+ spark.sql(s"CREATE TABLE t3 (v VOID)")
+ }.getMessage
+ assert(e3.contains("Cannot create tables with unknown type"))
+ val e4 = intercept[AnalysisException] {
+ spark.sql(s"CREATE TABLE t4 (v VOID) STORED AS PARQUET")
+ }.getMessage
+ assert(e4.contains("Cannot create tables with unknown type"))
+ }
+
+ // Forbid Replace table with VOID type
+ withTable("t") {
+ val v2Source = classOf[FakeV2Provider].getName
+ val e = intercept[AnalysisException] {
+ spark.sql(s"CREATE OR REPLACE TABLE t (v VOID) USING $v2Source")
+ }.getMessage
+ assert(e.contains("Cannot create tables with unknown type"))
+ }
+
+ // Make sure spark.catalog.createTable with null type will fail
+ val schema1 = new StructType().add("c", NullType)
+ assertHiveTableNullType(schema1)
+ assertDSTableNullType(schema1)
+
+ val schema2 = new StructType()
+ .add("c", StructType(Seq(StructField.apply("c1", NullType))))
+ assertHiveTableNullType(schema2)
+ assertDSTableNullType(schema2)
+
+ val schema3 = new StructType().add("c", ArrayType(NullType))
+ assertHiveTableNullType(schema3)
+ assertDSTableNullType(schema3)
+
+ val schema4 = new StructType()
+ .add("c", MapType(StringType, NullType))
+ assertHiveTableNullType(schema4)
+ assertDSTableNullType(schema4)
+
+ val schema5 = new StructType()
+ .add("c", MapType(NullType, StringType))
+ assertHiveTableNullType(schema5)
+ assertDSTableNullType(schema5)
+ }
+
+ private def assertHiveTableNullType(schema: StructType): Unit = {
+ withTable("t") {
+ val e = intercept[AnalysisException] {
+ spark.catalog.createTable(
+ tableName = "t",
+ source = "hive",
+ schema = schema,
+ options = Map("fileFormat" -> "parquet"))
+ }.getMessage
+ assert(e.contains("Cannot create tables with unknown type"))
+ }
+ }
+
+ private def assertDSTableNullType(schema: StructType): Unit = {
+ withTable("t") {
+ val e = intercept[AnalysisException] {
+ spark.catalog.createTable(
+ tableName = "t",
+ source = "json",
+ schema = schema,
+ options = Map.empty[String, String])
+ }.getMessage
+ assert(e.contains("Cannot create tables with unknown type"))
+ }
+ }
+
test("SPARK-21216: join with a streaming DataFrame") {
import org.apache.spark.sql.execution.streaming.MemoryStream
import testImplicits._
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
index 91fd8a4..61c48c6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
@@ -121,7 +121,7 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
msg = intercept[AnalysisException] {
sql("select null").write.mode("overwrite").orc(orcDir)
}.getMessage
- assert(msg.contains("ORC data source does not support null data type."))
+ assert(msg.contains("ORC data source does not support unknown data type."))
msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new IntervalData())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org