You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/19 12:30:16 UTC

[spark] branch master updated: [SPARK-26176][SQL] Verify column names for CTAS with `STORED AS`

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e402de5  [SPARK-26176][SQL] Verify column names for CTAS with `STORED AS`
e402de5 is described below

commit e402de5fd030cdc4150fda0755c7c636cad9619e
Author: s71955 <su...@gmail.com>
AuthorDate: Tue Mar 19 20:29:47 2019 +0800

    [SPARK-26176][SQL] Verify column names for CTAS with `STORED AS`
    
     ## What changes were proposed in this pull request?
    Currently, users meet job abortions while creating a table using the Hive serde "STORED AS" with invalid column names. We had better prevent this by raising **AnalysisException** with a guide to use aliases instead like Paquet data source tables.
    thus making compatible with error message shown while creating Parquet/ORC native table.
    
    **BEFORE**
    ```scala
    scala> sql("set spark.sql.hive.convertMetastoreParquet=false")
    scala> sql("CREATE TABLE a STORED AS PARQUET AS SELECT 1 AS `COUNT(ID)`")
    Caused by: java.lang.IllegalArgumentException: No enum constant parquet.schema.OriginalType.col1
    ```
    
    **AFTER**
    ```scala
    scala> sql("CREATE TABLE a STORED AS PARQUET AS SELECT 1 AS `COUNT(ID)`")
     Please use alias to rename it.;eption: Attribute name "count(ID)" contains invalid character(s) among " ,;{}()\n\t=".
    ```
    
    ## How was this patch tested?
    Pass the Jenkins with the newly added test case.
    
    Closes #24075 from sujith71955/master_serde.
    
    Authored-by: s71955 <su...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../src/main/scala/org/apache/spark/sql/execution/command/ddl.scala  | 3 ++-
 .../apache/spark/sql/execution/datasources/DataSourceStrategy.scala  | 2 --
 .../scala/org/apache/spark/sql/execution/datasources/rules.scala     | 3 +++
 .../src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala    | 5 ++---
 .../scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala    | 5 +++++
 5 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index bcd8908..235801a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -886,7 +886,8 @@ object DDLUtils {
           if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
             OrcFileFormat.checkFieldNames(colNames)
           } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
-              serde == Some("parquet.hive.serde.ParquetHiveSerDe")) {
+            serde == Some("parquet.hive.serde.ParquetHiveSerDe") ||
+            serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) {
             ParquetSchemaConverter.checkFieldNames(colNames)
           }
         case "parquet" => ParquetSchemaConverter.checkFieldNames(colNames)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a1252ee..4c69927 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -133,12 +133,10 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
-      DDLUtils.checkDataColNames(tableDesc)
       CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
 
     case CreateTable(tableDesc, mode, Some(query))
         if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
-      DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
       CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name))
 
     case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 949aa66..4e7fc40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -206,6 +206,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
         val analyzedQuery = query.get
         val normalizedTable = normalizeCatalogTable(analyzedQuery.schema, tableDesc)
 
+        DDLUtils.checkDataColNames(tableDesc.copy(schema = analyzedQuery.schema))
+
         val output = analyzedQuery.output
         val partitionAttrs = normalizedTable.partitionColumnNames.map { partCol =>
           output.find(_.name == partCol).get
@@ -219,6 +221,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
 
         c.copy(tableDesc = normalizedTable, query = Some(reorderedQuery))
       } else {
+        DDLUtils.checkDataColNames(tableDesc)
         val normalizedTable = normalizeCatalogTable(tableDesc.schema, tableDesc)
 
         val partitionSchema = normalizedTable.partitionColumnNames.map { partCol =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 8a5ab18..58b7110 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -151,11 +151,9 @@ object HiveAnalysis extends Rule[LogicalPlan] {
         ifPartitionNotExists, query.output.map(_.name))
 
     case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
-      DDLUtils.checkDataColNames(tableDesc)
       CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
 
     case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
-      DDLUtils.checkDataColNames(tableDesc)
       CreateHiveTableAsSelectCommand(tableDesc, query, query.output.map(_.name), mode)
 
     case InsertIntoDir(isLocal, storage, provider, child, overwrite)
@@ -210,7 +208,8 @@ case class RelationConversions(
       case CreateTable(tableDesc, mode, Some(query))
           if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty &&
             isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
-        DDLUtils.checkDataColNames(tableDesc)
+        // validation is required to be done here before relation conversion.
+        DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
         OptimizedCreateHiveTableAsSelectCommand(
           tableDesc, query, query.output.map(_.name), mode)
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ce7661a..aad34a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2163,6 +2163,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
           }.getMessage
           assert(m.contains(s"contains invalid character(s)"))
 
+          val m1 = intercept[AnalysisException] {
+            sql(s"CREATE TABLE t21912 STORED AS $source AS SELECT 1 `col$name`")
+          }.getMessage
+          assert(m1.contains(s"contains invalid character(s)"))
+
           val m2 = intercept[AnalysisException] {
             sql(s"CREATE TABLE t21912 USING $source AS SELECT 1 `col$name`")
           }.getMessage


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org