You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/07 05:20:52 UTC
spark git commit: [SPARK-21912][SQL] ORC/Parquet table should not
create invalid column names
Repository: spark
Updated Branches:
refs/heads/master ce7293c15 -> eea2b877c
[SPARK-21912][SQL] ORC/Parquet table should not create invalid column names
## What changes were proposed in this pull request?
Currently, users meet job abortions while creating or altering ORC/Parquet tables with invalid column names. We had better prevent this by raising **AnalysisException** with a guide to use aliases instead like Paquet data source tables.
**BEFORE**
```scala
scala> sql("CREATE TABLE orc1 USING ORC AS SELECT 1 `a b`")
17/09/04 13:28:21 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Error: : expected at the position 8 of 'struct<a b:int>' but ' ' is found.
17/09/04 13:28:21 ERROR FileFormatWriter: Job job_20170904132821_0001 aborted.
17/09/04 13:28:21 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: Task failed while writing rows.
```
**AFTER**
```scala
scala> sql("CREATE TABLE orc1 USING ORC AS SELECT 1 `a b`")
17/09/04 13:27:40 ERROR CreateDataSourceTableAsSelectCommand: Failed to write to table orc1
org.apache.spark.sql.AnalysisException: Attribute name "a b" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
```
## How was this patch tested?
Pass the Jenkins with a new test case.
Author: Dongjoon Hyun <do...@apache.org>
Closes #19124 from dongjoon-hyun/SPARK-21912.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eea2b877
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eea2b877
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eea2b877
Branch: refs/heads/master
Commit: eea2b877cf4e6ba4ea524bf8d782516add1b093e
Parents: ce7293c
Author: Dongjoon Hyun <do...@apache.org>
Authored: Wed Sep 6 22:20:48 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Sep 6 22:20:48 2017 -0700
----------------------------------------------------------------------
.../spark/sql/execution/command/ddl.scala | 21 ++++++++++
.../spark/sql/execution/command/tables.scala | 5 ++-
.../datasources/DataSourceStrategy.scala | 2 +
.../datasources/orc/OrcFileFormat.scala | 42 ++++++++++++++++++++
.../parquet/ParquetSchemaConverter.scala | 2 +-
.../resources/sql-tests/inputs/show_columns.sql | 4 +-
.../sql-tests/results/show_columns.sql.out | 4 +-
.../apache/spark/sql/hive/HiveStrategies.scala | 2 +
.../sql/hive/execution/SQLQuerySuite.scala | 34 ++++++++++++++++
9 files changed, 109 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index dae160f..7611e1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -34,6 +34,9 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
+import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
@@ -848,4 +851,22 @@ object DDLUtils {
}
}
}
+
+ private[sql] def checkDataSchemaFieldNames(table: CatalogTable): Unit = {
+ table.provider.foreach {
+ _.toLowerCase(Locale.ROOT) match {
+ case HIVE_PROVIDER =>
+ val serde = table.storage.serde
+ if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
+ OrcFileFormat.checkFieldNames(table.dataSchema)
+ } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
+ serde == Some("parquet.hive.serde.ParquetHiveSerDe")) {
+ ParquetSchemaConverter.checkFieldNames(table.dataSchema)
+ }
+ case "parquet" => ParquetSchemaConverter.checkFieldNames(table.dataSchema)
+ case "orc" => OrcFileFormat.checkFieldNames(table.dataSchema)
+ case _ =>
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 694d517..1dddc1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -201,13 +201,14 @@ case class AlterTableAddColumnsCommand(
// make sure any partition columns are at the end of the fields
val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
+ val newSchema = catalogTable.schema.copy(fields = reorderedSchema.toArray)
SchemaUtils.checkColumnNameDuplication(
reorderedSchema.map(_.name), "in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
+ DDLUtils.checkDataSchemaFieldNames(catalogTable.copy(schema = newSchema))
- catalog.alterTableSchema(
- table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
+ catalog.alterTableSchema(table, newSchema)
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 0deac19..5d6223d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -130,10 +130,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
+ DDLUtils.checkDataSchemaFieldNames(tableDesc)
CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
case CreateTable(tableDesc, mode, Some(query))
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
+ DDLUtils.checkDataSchemaFieldNames(tableDesc.copy(schema = query.schema))
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
new file mode 100644
index 0000000..2eeb006
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.orc.TypeDescription
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.types.StructType
+
+private[sql] object OrcFileFormat {
+ private def checkFieldName(name: String): Unit = {
+ try {
+ TypeDescription.fromString(s"struct<$name:int>")
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new AnalysisException(
+ s"""Column name "$name" contains invalid character(s).
+ |Please use alias to rename it.
+ """.stripMargin.split("\n").mkString(" ").trim)
+ }
+ }
+
+ def checkFieldNames(schema: StructType): StructType = {
+ schema.fieldNames.foreach(checkFieldName)
+ schema
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 0b805e4..b3781cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -556,7 +556,7 @@ private[parquet] class ParquetSchemaConverter(
}
}
-private[parquet] object ParquetSchemaConverter {
+private[sql] object ParquetSchemaConverter {
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
val EMPTY_MESSAGE: MessageType =
http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql b/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql
index 1e02c2f..521018e 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql
@@ -2,9 +2,9 @@ CREATE DATABASE showdb;
USE showdb;
-CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet;
+CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING json;
CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING parquet partitioned by (year, month);
-CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING parquet;
+CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING json;
CREATE GLOBAL TEMP VIEW showColumn4 AS SELECT 1 as col1, 'abc' as `col 5`;
http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out b/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out
index 05c3a08..71d6e12 100644
--- a/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out
@@ -19,7 +19,7 @@ struct<>
-- !query 2
-CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet
+CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING json
-- !query 2 schema
struct<>
-- !query 2 output
@@ -35,7 +35,7 @@ struct<>
-- !query 4
-CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING parquet
+CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING json
-- !query 4 schema
struct<>
-- !query 4 output
http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index ae1e7e7..47203a8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -151,9 +151,11 @@ object HiveAnalysis extends Rule[LogicalPlan] {
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
+ DDLUtils.checkDataSchemaFieldNames(tableDesc)
CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
+ DDLUtils.checkDataSchemaFieldNames(tableDesc)
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index d2a6ef7..85a6a77 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2000,4 +2000,38 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(setOfPath.size() == pathSizeToDeleteOnExit)
}
}
+
+ test("SPARK-21912 ORC/Parquet table should not create invalid column names") {
+ Seq(" ", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name =>
+ withTable("t21912") {
+ Seq("ORC", "PARQUET").foreach { source =>
+ val m = intercept[AnalysisException] {
+ sql(s"CREATE TABLE t21912(`col$name` INT) USING $source")
+ }.getMessage
+ assert(m.contains(s"contains invalid character(s)"))
+
+ val m2 = intercept[AnalysisException] {
+ sql(s"CREATE TABLE t21912 USING $source AS SELECT 1 `col$name`")
+ }.getMessage
+ assert(m2.contains(s"contains invalid character(s)"))
+
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
+ val m3 = intercept[AnalysisException] {
+ sql(s"CREATE TABLE t21912(`col$name` INT) USING hive OPTIONS (fileFormat '$source')")
+ }.getMessage
+ assert(m3.contains(s"contains invalid character(s)"))
+ }
+ }
+
+ // TODO: After SPARK-21929, we need to check ORC, too.
+ Seq("PARQUET").foreach { source =>
+ sql(s"CREATE TABLE t21912(`col` INT) USING $source")
+ val m = intercept[AnalysisException] {
+ sql(s"ALTER TABLE t21912 ADD COLUMNS(`col$name` INT)")
+ }.getMessage
+ assert(m.contains(s"contains invalid character(s)"))
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org