You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/07/08 01:59:47 UTC

[spark] branch master updated: [SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b5297c4  [SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype
b5297c4 is described below

commit b5297c43b0bd5a62a20fb047fdee24ebd63f939d
Author: LantaoJin <ji...@gmail.com>
AuthorDate: Tue Jul 7 18:58:01 2020 -0700

    [SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype
    
    ### What changes were proposed in this pull request?
    
    This is the new PR which to address the close one #17953
    
    1. support "void" primitive data type in the `AstBuilder`, point it to `NullType`
    2. forbid creating tables with VOID/NULL column type
    
    ### Why are the changes needed?
    
    1. Spark is incompatible with hive void type. When Hive table schema contains void type, DESC table will throw an exception in Spark.
    
    >hive> create table bad as select 1 x, null z from dual;
    >hive> describe bad;
    OK
    x	int
    z	void
    
    In Spark2.0.x, the behaviour to read this view is normal:
    >spark-sql> describe bad;
    x       int     NULL
    z       void    NULL
    Time taken: 4.431 seconds, Fetched 2 row(s)
    
    But in lastest Spark version, it failed with SparkException: Cannot recognize hive type string: void
    
    >spark-sql> describe bad;
    17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad]
    org.apache.spark.SparkException: Cannot recognize hive type string: void
    Caused by: org.apache.spark.sql.catalyst.parser.ParseException:
    DataType void() is not supported.(line 1, pos 0)
    == SQL ==
    void
    ^^^
            ... 61 more
    org.apache.spark.SparkException: Cannot recognize hive type string: void
    
    2. Hive CTAS statements throws error when select clause has NULL/VOID type column since HIVE-11217
    In Spark, creating table with a VOID/NULL column should throw readable exception message, include
    
    - create data source table (using parquet, json, ...)
    - create hive table (with or without stored as)
    - CTAS
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Add unit tests
    
    Closes #28833 from LantaoJin/SPARK-20680_COPY.
    
    Authored-by: LantaoJin <ji...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 python/pyspark/sql/types.py                        |   3 +
 .../sql/catalyst/analysis/ResolveCatalogs.scala    |  11 ++
 .../spark/sql/catalyst/parser/AstBuilder.scala     |   1 +
 .../sql/connector/catalog/CatalogV2Util.scala      |  21 +++-
 .../org/apache/spark/sql/types/NullType.scala      |   4 +
 .../sql/catalyst/parser/DataTypeParserSuite.scala  |   1 +
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  11 ++
 .../spark/sql/execution/datasources/rules.scala    |   3 +
 .../sql-functions/sql-expression-schema.md         |   2 +-
 .../sql-tests/results/ansi/literals.sql.out        |   2 +-
 .../sql-tests/results/inline-table.sql.out         |   2 +-
 .../resources/sql-tests/results/literals.sql.out   |   2 +-
 .../sql-tests/results/misc-functions.sql.out       |   2 +-
 .../sql-tests/results/postgreSQL/select.sql.out    |   4 +-
 .../results/sql-compatibility-functions.sql.out    |   6 +-
 .../sql-tests/results/udf/udf-inline-table.sql.out |   2 +-
 .../spark/sql/FileBasedDataSourceSuite.scala       |   2 +-
 .../org/apache/spark/sql/hive/HiveStrategies.scala |   3 +
 .../spark/sql/hive/execution/HiveDDLSuite.scala    | 121 +++++++++++++++++++++
 .../spark/sql/hive/orc/HiveOrcSourceSuite.scala    |   2 +-
 20 files changed, 191 insertions(+), 14 deletions(-)

diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 320a68d..ddd13ca3 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -116,6 +116,9 @@ class NullType(DataType):
 
     __metaclass__ = DataTypeSingleton
 
+    def simpleString(self):
+        return 'unknown'
+
 
 class AtomicType(DataType):
     """An internal type used to represent everything that is not
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 2a0a944..a406040 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -34,6 +34,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case AlterTableAddColumnsStatement(
          nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
+      cols.foreach(c => failNullType(c.dataType))
       cols.foreach(c => failCharType(c.dataType))
       val changes = cols.map { col =>
         TableChange.addColumn(
@@ -47,6 +48,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
 
     case AlterTableReplaceColumnsStatement(
         nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
+      cols.foreach(c => failNullType(c.dataType))
       cols.foreach(c => failCharType(c.dataType))
       val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
         case Some(table) =>
@@ -69,6 +71,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
 
     case a @ AlterTableAlterColumnStatement(
          nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
+      a.dataType.foreach(failNullType)
       a.dataType.foreach(failCharType)
       val colName = a.column.toArray
       val typeChange = a.dataType.map { newDataType =>
@@ -145,6 +148,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
 
     case c @ CreateTableStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+      assertNoNullTypeInSchema(c.tableSchema)
       assertNoCharTypeInSchema(c.tableSchema)
       CreateV2Table(
         catalog.asTableCatalog,
@@ -157,6 +161,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
 
     case c @ CreateTableAsSelectStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+      if (c.asSelect.resolved) {
+        assertNoNullTypeInSchema(c.asSelect.schema)
+      }
       CreateTableAsSelect(
         catalog.asTableCatalog,
         tbl.asIdentifier,
@@ -172,6 +179,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
 
     case c @ ReplaceTableStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+      assertNoNullTypeInSchema(c.tableSchema)
       assertNoCharTypeInSchema(c.tableSchema)
       ReplaceTable(
         catalog.asTableCatalog,
@@ -184,6 +192,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
 
     case c @ ReplaceTableAsSelectStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+      if (c.asSelect.resolved) {
+        assertNoNullTypeInSchema(c.asSelect.schema)
+      }
       ReplaceTableAsSelect(
         catalog.asTableCatalog,
         tbl.asIdentifier,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d08bcb1..6b41a8b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2203,6 +2203,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
         DecimalType(precision.getText.toInt, 0)
       case ("decimal" | "dec" | "numeric", precision :: scale :: Nil) =>
         DecimalType(precision.getText.toInt, scale.getText.toInt)
+      case ("void", Nil) => NullType
       case ("interval", Nil) => CalendarIntervalType
       case (dt, params) =>
         val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e1f3293..d130a13 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.AlterTable
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, NullType, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.Utils
 
@@ -346,4 +346,23 @@ private[sql] object CatalogV2Util {
       }
     }
   }
+
+  def failNullType(dt: DataType): Unit = {
+    def containsNullType(dt: DataType): Boolean = dt match {
+      case ArrayType(et, _) => containsNullType(et)
+      case MapType(kt, vt, _) => containsNullType(kt) || containsNullType(vt)
+      case StructType(fields) => fields.exists(f => containsNullType(f.dataType))
+      case _ => dt.isInstanceOf[NullType]
+    }
+    if (containsNullType(dt)) {
+      throw new AnalysisException(
+        "Cannot create tables with unknown type.")
+    }
+  }
+
+  def assertNoNullTypeInSchema(schema: StructType): Unit = {
+    schema.foreach { f =>
+      failNullType(f.dataType)
+    }
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
index 14097a5..6c9a1d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
@@ -32,6 +32,10 @@ class NullType private() extends DataType {
   override def defaultSize: Int = 1
 
   private[spark] override def asNullable: NullType = this
+
+  // "null" is mainly used to represent a literal in Spark,
+  // it's better to avoid using it for data types.
+  override def simpleString: String = "unknown"
 }
 
 /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
index d519fdf..655b1d2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
@@ -61,6 +61,7 @@ class DataTypeParserSuite extends SparkFunSuite {
   checkDataType("varchAr(20)", StringType)
   checkDataType("cHaR(27)", StringType)
   checkDataType("BINARY", BinaryType)
+  checkDataType("void", NullType)
   checkDataType("interval", CalendarIntervalType)
 
   checkDataType("array<doublE>", ArrayType(DoubleType, true))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index bf90875..bc3f38a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -48,6 +48,7 @@ class ResolveSessionCatalog(
   override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
     case AlterTableAddColumnsStatement(
          nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
+      cols.foreach(c => failNullType(c.dataType))
       loadTable(catalog, tbl.asIdentifier).collect {
         case v1Table: V1Table =>
           if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
@@ -76,6 +77,7 @@ class ResolveSessionCatalog(
 
     case AlterTableReplaceColumnsStatement(
         nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
+      cols.foreach(c => failNullType(c.dataType))
       val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
         case Some(_: V1Table) =>
           throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
@@ -100,6 +102,7 @@ class ResolveSessionCatalog(
 
     case a @ AlterTableAlterColumnStatement(
          nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
+      a.dataType.foreach(failNullType)
       loadTable(catalog, tbl.asIdentifier).collect {
         case v1Table: V1Table =>
           if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
@@ -268,6 +271,7 @@ class ResolveSessionCatalog(
     // session catalog and the table provider is not v2.
     case c @ CreateTableStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+      assertNoNullTypeInSchema(c.tableSchema)
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         if (!DDLUtils.isHiveTable(Some(provider))) {
@@ -292,6 +296,9 @@ class ResolveSessionCatalog(
 
     case c @ CreateTableAsSelectStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+      if (c.asSelect.resolved) {
+        assertNoNullTypeInSchema(c.asSelect.schema)
+      }
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
@@ -319,6 +326,7 @@ class ResolveSessionCatalog(
     // session catalog and the table provider is not v2.
     case c @ ReplaceTableStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+      assertNoNullTypeInSchema(c.tableSchema)
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
@@ -336,6 +344,9 @@ class ResolveSessionCatalog(
 
     case c @ ReplaceTableAsSelectStatement(
          SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
+      if (c.asSelect.resolved) {
+        assertNoNullTypeInSchema(c.asSelect.schema)
+      }
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 95343e2..60cacda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
 import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
@@ -292,6 +293,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
       "in the table definition of " + table.identifier,
       sparkSession.sessionState.conf.caseSensitiveAnalysis)
 
+    assertNoNullTypeInSchema(schema)
+
     val normalizedPartCols = normalizePartitionColumns(schema, table)
     val normalizedBucketSpec = normalizeBucketSpec(schema, table)
 
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 8898a11..c39adac 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -34,7 +34,7 @@
 | org.apache.spark.sql.catalyst.expressions.Ascii | ascii | SELECT ascii('222') | struct<ascii(222):int> |
 | org.apache.spark.sql.catalyst.expressions.Asin | asin | SELECT asin(0) | struct<ASIN(CAST(0 AS DOUBLE)):double> |
 | org.apache.spark.sql.catalyst.expressions.Asinh | asinh | SELECT asinh(0) | struct<ASINH(CAST(0 AS DOUBLE)):double> |
-| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):null> |
+| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):unknown> |
 | org.apache.spark.sql.catalyst.expressions.Atan | atan | SELECT atan(0) | struct<ATAN(CAST(0 AS DOUBLE)):double> |
 | org.apache.spark.sql.catalyst.expressions.Atan2 | atan2 | SELECT atan2(0, 0) | struct<ATAN2(CAST(0 AS DOUBLE), CAST(0 AS DOUBLE)):double> |
 | org.apache.spark.sql.catalyst.expressions.Atanh | atanh | SELECT atanh(0) | struct<ATANH(CAST(0 AS DOUBLE)):double> |
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
index f6720f6..0274771 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
@@ -5,7 +5,7 @@
 -- !query
 select null, Null, nUll
 -- !query schema
-struct<NULL:null,NULL:null,NULL:null>
+struct<NULL:unknown,NULL:unknown,NULL:unknown>
 -- !query output
 NULL	NULL	NULL
 
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
index 9943b93..2dd6960 100644
--- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
@@ -49,7 +49,7 @@ two	2
 -- !query
 select * from values ("one", null), ("two", null) as data(a, b)
 -- !query schema
-struct<a:string,b:null>
+struct<a:string,b:unknown>
 -- !query output
 one	NULL
 two	NULL
diff --git a/sql/core/src/test/resources/sql-tests/results/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/literals.sql.out
index f6720f6..0274771 100644
--- a/sql/core/src/test/resources/sql-tests/results/literals.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/literals.sql.out
@@ -5,7 +5,7 @@
 -- !query
 select null, Null, nUll
 -- !query schema
-struct<NULL:null,NULL:null,NULL:null>
+struct<NULL:unknown,NULL:unknown,NULL:unknown>
 -- !query output
 NULL	NULL	NULL
 
diff --git a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out
index bd8ffb8..8d34bf2 100644
--- a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out
@@ -7,7 +7,7 @@ select typeof(null)
 -- !query schema
 struct<typeof(NULL):string>
 -- !query output
-null
+unknown
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out
index 1e59036..8b32bd6 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out
@@ -308,7 +308,7 @@ struct<1:int>
 -- !query
 select foo.* from (select null) as foo
 -- !query schema
-struct<NULL:null>
+struct<NULL:unknown>
 -- !query output
 NULL
 
@@ -316,7 +316,7 @@ NULL
 -- !query
 select foo.* from (select 'xyzzy',1,null) as foo
 -- !query schema
-struct<xyzzy:string,1:int,NULL:null>
+struct<xyzzy:string,1:int,NULL:unknown>
 -- !query output
 xyzzy	1	NULL
 
diff --git a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
index 26a44a8..b905f9e 100644
--- a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
@@ -5,7 +5,7 @@
 -- !query
 SELECT ifnull(null, 'x'), ifnull('y', 'x'), ifnull(null, null)
 -- !query schema
-struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):null>
+struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):unknown>
 -- !query output
 x	y	NULL
 
@@ -21,7 +21,7 @@ NULL	x
 -- !query
 SELECT nvl(null, 'x'), nvl('y', 'x'), nvl(null, null)
 -- !query schema
-struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):null>
+struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):unknown>
 -- !query output
 x	y	NULL
 
@@ -29,7 +29,7 @@ x	y	NULL
 -- !query
 SELECT nvl2(null, 'x', 'y'), nvl2('n', 'x', 'y'), nvl2(null, null, null)
 -- !query schema
-struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):null>
+struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):unknown>
 -- !query output
 y	x	NULL
 
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
index d78d347..0680a87 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
@@ -49,7 +49,7 @@ two	2
 -- !query
 select udf(a), b from values ("one", null), ("two", null) as data(a, b)
 -- !query schema
-struct<CAST(udf(cast(a as string)) AS STRING):string,b:null>
+struct<CAST(udf(cast(a as string)) AS STRING):string,b:unknown>
 -- !query output
 one	NULL
 two	NULL
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 231a8f2..daa262d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -405,7 +405,7 @@ class FileBasedDataSourceSuite extends QueryTest
         ""
       }
       def errorMessage(format: String): String = {
-        s"$format data source does not support null data type."
+        s"$format data source does not support unknown data type."
       }
       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
         withTempDir { dir =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b9c98f4..2b1eb05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CreateTable
@@ -225,6 +226,8 @@ case class RelationConversions(
             isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
         // validation is required to be done here before relation conversion.
         DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
+        // This is for CREATE TABLE .. STORED AS PARQUET/ORC AS SELECT null
+        assertNoNullTypeInSchema(query.schema)
         OptimizedCreateHiveTableAsSelectCommand(
           tableDesc, query, query.output.map(_.name), mode)
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e8cf4ad..774fb5b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.connector.FakeV2Provider
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
 import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
@@ -2309,6 +2310,126 @@ class HiveDDLSuite
     }
   }
 
+  test("SPARK-20680: Spark-sql do not support for unknown column datatype") {
+    withTable("t") {
+      withView("tabUnknownType") {
+        hiveClient.runSqlHive("CREATE TABLE t (t1 int)")
+        hiveClient.runSqlHive("INSERT INTO t VALUES (3)")
+        hiveClient.runSqlHive("CREATE VIEW tabUnknownType AS SELECT NULL AS col FROM t")
+        checkAnswer(spark.table("tabUnknownType"), Row(null))
+        // No exception shows
+        val desc = spark.sql("DESC tabUnknownType").collect().toSeq
+        assert(desc.contains(Row("col", NullType.simpleString, null)))
+      }
+    }
+
+    // Forbid CTAS with unknown type
+    withTable("t1", "t2", "t3") {
+      val e1 = intercept[AnalysisException] {
+        spark.sql("CREATE TABLE t1 USING PARQUET AS SELECT null as null_col")
+      }.getMessage
+      assert(e1.contains("Cannot create tables with unknown type"))
+
+      val e2 = intercept[AnalysisException] {
+        spark.sql("CREATE TABLE t2 AS SELECT null as null_col")
+      }.getMessage
+      assert(e2.contains("Cannot create tables with unknown type"))
+
+      val e3 = intercept[AnalysisException] {
+        spark.sql("CREATE TABLE t3 STORED AS PARQUET AS SELECT null as null_col")
+      }.getMessage
+      assert(e3.contains("Cannot create tables with unknown type"))
+    }
+
+    // Forbid Replace table AS SELECT with unknown type
+    withTable("t") {
+      val v2Source = classOf[FakeV2Provider].getName
+      val e = intercept[AnalysisException] {
+        spark.sql(s"CREATE OR REPLACE TABLE t USING $v2Source AS SELECT null as null_col")
+      }.getMessage
+      assert(e.contains("Cannot create tables with unknown type"))
+    }
+
+    // Forbid creating table with VOID type in Spark
+    withTable("t1", "t2", "t3", "t4") {
+      val e1 = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE t1 (v VOID) USING PARQUET")
+      }.getMessage
+      assert(e1.contains("Cannot create tables with unknown type"))
+      val e2 = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE t2 (v VOID) USING hive")
+      }.getMessage
+      assert(e2.contains("Cannot create tables with unknown type"))
+      val e3 = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE t3 (v VOID)")
+      }.getMessage
+      assert(e3.contains("Cannot create tables with unknown type"))
+      val e4 = intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE t4 (v VOID) STORED AS PARQUET")
+      }.getMessage
+      assert(e4.contains("Cannot create tables with unknown type"))
+    }
+
+    // Forbid Replace table with VOID type
+    withTable("t") {
+      val v2Source = classOf[FakeV2Provider].getName
+      val e = intercept[AnalysisException] {
+        spark.sql(s"CREATE OR REPLACE TABLE t (v VOID) USING $v2Source")
+      }.getMessage
+      assert(e.contains("Cannot create tables with unknown type"))
+    }
+
+    // Make sure spark.catalog.createTable with null type will fail
+    val schema1 = new StructType().add("c", NullType)
+    assertHiveTableNullType(schema1)
+    assertDSTableNullType(schema1)
+
+    val schema2 = new StructType()
+      .add("c", StructType(Seq(StructField.apply("c1", NullType))))
+    assertHiveTableNullType(schema2)
+    assertDSTableNullType(schema2)
+
+    val schema3 = new StructType().add("c", ArrayType(NullType))
+    assertHiveTableNullType(schema3)
+    assertDSTableNullType(schema3)
+
+    val schema4 = new StructType()
+      .add("c", MapType(StringType, NullType))
+    assertHiveTableNullType(schema4)
+    assertDSTableNullType(schema4)
+
+    val schema5 = new StructType()
+      .add("c", MapType(NullType, StringType))
+    assertHiveTableNullType(schema5)
+    assertDSTableNullType(schema5)
+  }
+
+  private def assertHiveTableNullType(schema: StructType): Unit = {
+    withTable("t") {
+      val e = intercept[AnalysisException] {
+        spark.catalog.createTable(
+          tableName = "t",
+          source = "hive",
+          schema = schema,
+          options = Map("fileFormat" -> "parquet"))
+      }.getMessage
+      assert(e.contains("Cannot create tables with unknown type"))
+    }
+  }
+
+  private def assertDSTableNullType(schema: StructType): Unit = {
+    withTable("t") {
+      val e = intercept[AnalysisException] {
+        spark.catalog.createTable(
+          tableName = "t",
+          source = "json",
+          schema = schema,
+          options = Map.empty[String, String])
+      }.getMessage
+      assert(e.contains("Cannot create tables with unknown type"))
+    }
+  }
+
   test("SPARK-21216: join with a streaming DataFrame") {
     import org.apache.spark.sql.execution.streaming.MemoryStream
     import testImplicits._
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
index 91fd8a4..61c48c6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
@@ -121,7 +121,7 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
       msg = intercept[AnalysisException] {
         sql("select null").write.mode("overwrite").orc(orcDir)
       }.getMessage
-      assert(msg.contains("ORC data source does not support null data type."))
+      assert(msg.contains("ORC data source does not support unknown data type."))
 
       msg = intercept[AnalysisException] {
         spark.udf.register("testType", () => new IntervalData())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org