You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/14 05:34:14 UTC

[spark] branch branch-2.4 updated: [SPARK-31968][SQL] Duplicate partition columns check when writing data

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new e44190a  [SPARK-31968][SQL] Duplicate partition columns check when writing data
e44190a is described below

commit e44190a0812db4c4cf8ee107d17b76436cef7a9d
Author: TJX2014 <xi...@gmail.com>
AuthorDate: Sat Jun 13 22:21:35 2020 -0700

    [SPARK-31968][SQL] Duplicate partition columns check when writing data
    
    ### What changes were proposed in this pull request?
    A unit test is added
    Partition duplicate check added in `org.apache.spark.sql.execution.datasources.PartitioningUtils#validatePartitionColumn`
    
    ### Why are the changes needed?
    When people write data with duplicate partition column, it will cause a `org.apache.spark.sql.AnalysisException: Found duplicate column ...` in loading data from the  writted.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    It will prevent people from using duplicate partition columns to write data.
    1. Before the PR:
    It will look ok at `df.write.partitionBy("b", "b").csv("file:///tmp/output")`,
    but get an exception when read:
    `spark.read.csv("file:///tmp/output").show()`
    org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the partition schema: `b`;
    2. After the PR:
    `df.write.partitionBy("b", "b").csv("file:///tmp/output")` will trigger the exception:
    org.apache.spark.sql.AnalysisException: Found duplicate column(s) b, b: `b`;
    
    ### How was this patch tested?
    Unit test.
    
    Closes #28814 from TJX2014/master-SPARK-31968.
    
    Authored-by: TJX2014 <xi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit a4ea599b1b9b8ebaae0100b54e6ac1d7576c6d8c)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/execution/datasources/PartitioningUtils.scala    |  3 +++
 .../org/apache/spark/sql/sources/PartitionedWriteSuite.scala   | 10 +++++++++-
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 41b0bb6..44ff300 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -507,6 +507,9 @@ object PartitioningUtils {
       partitionColumns: Seq[String],
       caseSensitive: Boolean): Unit = {
 
+    SchemaUtils.checkColumnNameDuplication(
+      partitionColumns, partitionColumns.mkString(", "), caseSensitive)
+
     partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
       field => field.dataType match {
         case _: AtomicType => // OK
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index 27c983f..a4d9238 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
 
 import org.apache.spark.TestUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
@@ -156,4 +156,12 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-31968: duplicate partition columns check") {
+    withTempPath { f =>
+      val e = intercept[AnalysisException](
+        Seq((3, 2)).toDF("a", "b").write.partitionBy("b", "b").csv(f.getAbsolutePath))
+      assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org