You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/09 09:19:01 UTC

[spark] branch branch-3.2 updated: [SPARK-36271][SQL] Unify V1 insert check field name before prepare writter

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 2a46bf6  [SPARK-36271][SQL] Unify V1 insert check field name before prepare writter
2a46bf6 is described below

commit 2a46bf6a3b92d0d7d66dd553e38f2b129a2e7918
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Aug 9 17:18:06 2021 +0800

    [SPARK-36271][SQL] Unify V1 insert check field name before prepare writter
    
    ### What changes were proposed in this pull request?
    Unify DataSource V1 insert schema check field name before prepare writer.
    And in this PR we add check for avro V1 insert too.
    
    ### Why are the changes needed?
    Unify code and add check for avro V1 insert too.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT
    
    Closes #33566 from AngersZhuuuu/SPARK-36271.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit f3e079b09b5877c97fc10864937e76d866935880)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 22 ++++++++++++++++++++++
 .../execution/datasources/DataSourceUtils.scala    |  1 +
 .../datasources/parquet/ParquetWriteSupport.scala  |  1 -
 .../sql/execution/datasources/v2/FileWrite.scala   |  1 +
 .../datasources/v2/parquet/ParquetWrite.scala      |  1 +
 .../spark/sql/FileBasedDataSourceSuite.scala       | 22 ++++++++++++++++++++++
 6 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index f93c61a..78f08f7 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -2195,6 +2195,28 @@ class AvroV1Suite extends AvroSuite {
     super
       .sparkConf
       .set(SQLConf.USE_V1_SOURCE_LIST, "avro")
+
+  test("SPARK-36271: V1 insert should check schema field name too") {
+    withView("v") {
+      spark.range(1).createTempView("v")
+      withTempDir { dir =>
+        val e = intercept[AnalysisException] {
+          sql("SELECT ID, IF(ID=1,1,0) FROM v").write.mode(SaveMode.Overwrite)
+            .format("avro").save(dir.getCanonicalPath)
+        }.getMessage
+        assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s)."))
+      }
+
+      withTempDir { dir =>
+        val e = intercept[AnalysisException] {
+          sql("SELECT NAMED_STRUCT('(IF((ID = 1), 1, 0))', IF(ID=1,ID,0)) AS col1 FROM v")
+            .write.mode(SaveMode.Overwrite)
+            .format("avro").save(dir.getCanonicalPath)
+        }.getMessage
+        assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s)."))
+      }
+    }
+  }
 }
 
 class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index b562d44..fcd95a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -84,6 +84,7 @@ object DataSourceUtils {
         throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(format.toString, field)
       }
     }
+    checkFieldNames(format, schema)
   }
 
   // SPARK-24626: Metadata files and temporary files should not be
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 20f69e8..d0cd02f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -482,7 +482,6 @@ object ParquetWriteSupport {
   val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
 
   def setSchema(schema: StructType, configuration: Configuration): Unit = {
-    ParquetSchemaConverter.checkFieldNames(schema)
     configuration.set(SPARK_ROW_SCHEMA, schema.json)
     configuration.setIfUnset(
       ParquetOutputFormat.WRITER_VERSION,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
index 9c4bc78..7f66a09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
@@ -93,6 +93,7 @@ trait FileWrite extends Write {
       s"when inserting into $pathName", caseSensitiveAnalysis)
     DataSource.validateSchema(schema)
 
+    // TODO: [SPARK-36340] Unify check schema filed of DataSource V2 Insert.
     schema.foreach { field =>
       if (!supportsDataType(field.dataType)) {
         throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(formatName, field)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala
index 0316d91..b2b6d31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala
@@ -72,6 +72,7 @@ case class ParquetWrite(
 
     ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
 
+    ParquetSchemaConverter.checkFieldNames(dataSchema)
     // This metadata is useful for keeping UDTs like Vector/Matrix.
     ParquetWriteSupport.setSchema(dataSchema, conf)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index c71f667..2436392 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -967,6 +967,28 @@ class FileBasedDataSourceSuite extends QueryTest
       checkAnswer(df, Row("v1", "v2"))
     }
   }
+
+  test("SPARK-36271: V1 insert should check schema field name too") {
+    withView("v") {
+      spark.range(1).createTempView("v")
+      withTempDir { dir =>
+        val e = intercept[AnalysisException] {
+          sql("SELECT ID, IF(ID=1,1,0) FROM v").write.mode(SaveMode.Overwrite)
+            .format("parquet").save(dir.getCanonicalPath)
+        }.getMessage
+        assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s)."))
+      }
+
+      withTempDir { dir =>
+        val e = intercept[AnalysisException] {
+          sql("SELECT NAMED_STRUCT('(IF((ID = 1), 1, 0))', IF(ID=1,ID,0)) AS col1 FROM v")
+            .write.mode(SaveMode.Overwrite)
+            .format("parquet").save(dir.getCanonicalPath)
+        }.getMessage
+        assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s)."))
+      }
+    }
+  }
 }
 
 object TestingUDT {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org