You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/20 13:09:08 UTC

[spark] branch branch-3.2 updated: [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 7cd89ef  [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too
7cd89ef is described below

commit 7cd89efca5a74dcc2457c7be5f2ef65aeb90a967
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Jul 20 21:08:03 2021 +0800

    [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too
    
    ### What changes were proposed in this pull request?
    When inner field have wrong schema filed name should check field name too.
    ![image](https://user-images.githubusercontent.com/46485123/126101009-c192d87f-1e18-4355-ad53-1419dacdeb76.png)
    
    ### Why are the changes needed?
    Early check early faield
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT
    
    Closes #33409 from AngersZhuuuu/SPARK-36201.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 251885772d41a572655e950a8e298315f222a803)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/execution/command/ddl.scala | 12 ++++++------
 .../apache/spark/sql/execution/command/tables.scala  |  2 +-
 .../execution/datasources/orc/OrcFileFormat.scala    | 10 ++++++++--
 .../datasources/parquet/ParquetSchemaConverter.scala | 10 ++++++++--
 .../spark/sql/hive/execution/HiveDDLSuite.scala      | 20 ++++++++++++++++++++
 5 files changed, 43 insertions(+), 11 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 605d98e..140f9d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -924,23 +924,23 @@ object DDLUtils {
   }
 
   private[sql] def checkDataColNames(table: CatalogTable): Unit = {
-    checkDataColNames(table, table.dataSchema.fieldNames)
+    checkDataColNames(table, table.dataSchema)
   }
 
-  private[sql] def checkDataColNames(table: CatalogTable, colNames: Seq[String]): Unit = {
+  private[sql] def checkDataColNames(table: CatalogTable, schema: StructType): Unit = {
     table.provider.foreach {
       _.toLowerCase(Locale.ROOT) match {
         case HIVE_PROVIDER =>
           val serde = table.storage.serde
           if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
-            OrcFileFormat.checkFieldNames(colNames)
+            OrcFileFormat.checkFieldNames(schema)
           } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
             serde == Some("parquet.hive.serde.ParquetHiveSerDe") ||
             serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) {
-            ParquetSchemaConverter.checkFieldNames(colNames)
+            ParquetSchemaConverter.checkFieldNames(schema)
           }
-        case "parquet" => ParquetSchemaConverter.checkFieldNames(colNames)
-        case "orc" => OrcFileFormat.checkFieldNames(colNames)
+        case "parquet" => ParquetSchemaConverter.checkFieldNames(schema)
+        case "orc" => OrcFileFormat.checkFieldNames(schema)
         case _ =>
       }
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 0599621..f740915 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -236,7 +236,7 @@ case class AlterTableAddColumnsCommand(
       (colsToAdd ++ catalogTable.schema).map(_.name),
       "in the table definition of " + table.identifier,
       conf.caseSensitiveAnalysis)
-    DDLUtils.checkDataColNames(catalogTable, colsToAdd.map(_.name))
+    DDLUtils.checkDataColNames(catalogTable, StructType(colsToAdd))
 
     val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
     catalog.alterTableDataSchema(table, StructType(existingSchema ++ colsToAdd))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index d6593ca..9024c78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -52,8 +52,14 @@ private[sql] object OrcFileFormat {
     }
   }
 
-  def checkFieldNames(names: Seq[String]): Unit = {
-    names.foreach(checkFieldName)
+  def checkFieldNames(schema: StructType): Unit = {
+    schema.foreach { field =>
+      checkFieldName(field.name)
+      field.dataType match {
+        case s: StructType => checkFieldNames(s)
+        case _ =>
+      }
+    }
   }
 
   def getQuotedSchemaString(dataType: DataType): String = dataType match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 1b26c69..a23eebe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -593,8 +593,14 @@ private[sql] object ParquetSchemaConverter {
        """.stripMargin.split("\n").mkString(" ").trim)
   }
 
-  def checkFieldNames(names: Seq[String]): Unit = {
-    names.foreach(checkFieldName)
+  def checkFieldNames(schema: StructType): Unit = {
+    schema.foreach { field =>
+      checkFieldName(field.name)
+      field.dataType match {
+        case s: StructType => checkFieldNames(s)
+        case _ =>
+      }
+    }
   }
 
   def checkConversionRequirement(f: => Boolean, message: String): Unit = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 9a39f18..3e01fcb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -3008,6 +3008,26 @@ class HiveDDLSuite
     }
   }
 
+  test("SPARK-36201: Add check for inner field of parquet/orc schema") {
+    withView("v") {
+      spark.range(1).createTempView("v")
+      withTempPath { path =>
+        val e = intercept[AnalysisException] {
+          spark.sql(
+            s"""
+               |INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}'
+               |STORED AS PARQUET
+               |SELECT
+               |NAMED_STRUCT('ID', ID, 'IF(ID=1,ID,0)', IF(ID=1,ID,0), 'B', ABS(ID)) AS col1
+               |FROM v
+               """.stripMargin)
+        }.getMessage
+        assert(e.contains("Attribute name \"IF(ID=1,ID,0)\" contains" +
+          " invalid character(s) among \" ,;{}()\\n\\t=\". Please use alias to rename it."))
+      }
+    }
+  }
+
   test("SPARK-34261: Avoid side effect if create exists temporary function") {
     withUserDefinedFunction("f1" -> true) {
       sql("CREATE TEMPORARY FUNCTION f1 AS 'org.apache.hadoop.hive.ql.udf.UDFUUID'")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org