You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/19 12:30:16 UTC
[spark] branch master updated: [SPARK-26176][SQL] Verify column
names for CTAS with `STORED AS`
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e402de5 [SPARK-26176][SQL] Verify column names for CTAS with `STORED AS`
e402de5 is described below
commit e402de5fd030cdc4150fda0755c7c636cad9619e
Author: s71955 <su...@gmail.com>
AuthorDate: Tue Mar 19 20:29:47 2019 +0800
[SPARK-26176][SQL] Verify column names for CTAS with `STORED AS`
## What changes were proposed in this pull request?
Currently, users meet job abortions while creating a table using the Hive serde "STORED AS" with invalid column names. We had better prevent this by raising **AnalysisException** with a guide to use aliases instead like Paquet data source tables.
thus making compatible with error message shown while creating Parquet/ORC native table.
**BEFORE**
```scala
scala> sql("set spark.sql.hive.convertMetastoreParquet=false")
scala> sql("CREATE TABLE a STORED AS PARQUET AS SELECT 1 AS `COUNT(ID)`")
Caused by: java.lang.IllegalArgumentException: No enum constant parquet.schema.OriginalType.col1
```
**AFTER**
```scala
scala> sql("CREATE TABLE a STORED AS PARQUET AS SELECT 1 AS `COUNT(ID)`")
Please use alias to rename it.;eption: Attribute name "count(ID)" contains invalid character(s) among " ,;{}()\n\t=".
```
## How was this patch tested?
Pass the Jenkins with the newly added test case.
Closes #24075 from sujith71955/master_serde.
Authored-by: s71955 <su...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../src/main/scala/org/apache/spark/sql/execution/command/ddl.scala | 3 ++-
.../apache/spark/sql/execution/datasources/DataSourceStrategy.scala | 2 --
.../scala/org/apache/spark/sql/execution/datasources/rules.scala | 3 +++
.../src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 5 ++---
.../scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 5 +++++
5 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index bcd8908..235801a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -886,7 +886,8 @@ object DDLUtils {
if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
OrcFileFormat.checkFieldNames(colNames)
} else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
- serde == Some("parquet.hive.serde.ParquetHiveSerDe")) {
+ serde == Some("parquet.hive.serde.ParquetHiveSerDe") ||
+ serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) {
ParquetSchemaConverter.checkFieldNames(colNames)
}
case "parquet" => ParquetSchemaConverter.checkFieldNames(colNames)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a1252ee..4c69927 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -133,12 +133,10 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
- DDLUtils.checkDataColNames(tableDesc)
CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
case CreateTable(tableDesc, mode, Some(query))
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
- DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name))
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 949aa66..4e7fc40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -206,6 +206,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
val analyzedQuery = query.get
val normalizedTable = normalizeCatalogTable(analyzedQuery.schema, tableDesc)
+ DDLUtils.checkDataColNames(tableDesc.copy(schema = analyzedQuery.schema))
+
val output = analyzedQuery.output
val partitionAttrs = normalizedTable.partitionColumnNames.map { partCol =>
output.find(_.name == partCol).get
@@ -219,6 +221,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
c.copy(tableDesc = normalizedTable, query = Some(reorderedQuery))
} else {
+ DDLUtils.checkDataColNames(tableDesc)
val normalizedTable = normalizeCatalogTable(tableDesc.schema, tableDesc)
val partitionSchema = normalizedTable.partitionColumnNames.map { partCol =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 8a5ab18..58b7110 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -151,11 +151,9 @@ object HiveAnalysis extends Rule[LogicalPlan] {
ifPartitionNotExists, query.output.map(_.name))
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
- DDLUtils.checkDataColNames(tableDesc)
CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
- DDLUtils.checkDataColNames(tableDesc)
CreateHiveTableAsSelectCommand(tableDesc, query, query.output.map(_.name), mode)
case InsertIntoDir(isLocal, storage, provider, child, overwrite)
@@ -210,7 +208,8 @@ case class RelationConversions(
case CreateTable(tableDesc, mode, Some(query))
if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty &&
isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
- DDLUtils.checkDataColNames(tableDesc)
+ // validation is required to be done here before relation conversion.
+ DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ce7661a..aad34a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2163,6 +2163,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}.getMessage
assert(m.contains(s"contains invalid character(s)"))
+ val m1 = intercept[AnalysisException] {
+ sql(s"CREATE TABLE t21912 STORED AS $source AS SELECT 1 `col$name`")
+ }.getMessage
+ assert(m1.contains(s"contains invalid character(s)"))
+
val m2 = intercept[AnalysisException] {
sql(s"CREATE TABLE t21912 USING $source AS SELECT 1 `col$name`")
}.getMessage
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org