You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/14 05:27:56 UTC
[spark] branch branch-3.0 updated: [SPARK-31968][SQL] Duplicate
partition columns check when writing data
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e2cde45 [SPARK-31968][SQL] Duplicate partition columns check when writing data
e2cde45 is described below
commit e2cde45a597189c64dbde7f0d6e2c25001578e22
Author: TJX2014 <xi...@gmail.com>
AuthorDate: Sat Jun 13 22:21:35 2020 -0700
[SPARK-31968][SQL] Duplicate partition columns check when writing data
### What changes were proposed in this pull request?
A unit test is added
Partition duplicate check added in `org.apache.spark.sql.execution.datasources.PartitioningUtils#validatePartitionColumn`
### Why are the changes needed?
When people write data with duplicate partition column, it will cause a `org.apache.spark.sql.AnalysisException: Found duplicate column ...` in loading data from the writted.
### Does this PR introduce _any_ user-facing change?
Yes.
It will prevent people from using duplicate partition columns to write data.
1. Before the PR:
It will look ok at `df.write.partitionBy("b", "b").csv("file:///tmp/output")`,
but get an exception when read:
`spark.read.csv("file:///tmp/output").show()`
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the partition schema: `b`;
2. After the PR:
`df.write.partitionBy("b", "b").csv("file:///tmp/output")` will trigger the exception:
org.apache.spark.sql.AnalysisException: Found duplicate column(s) b, b: `b`;
### How was this patch tested?
Unit test.
Closes #28814 from TJX2014/master-SPARK-31968.
Authored-by: TJX2014 <xi...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit a4ea599b1b9b8ebaae0100b54e6ac1d7576c6d8c)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/execution/datasources/PartitioningUtils.scala | 3 +++
.../org/apache/spark/sql/sources/PartitionedWriteSuite.scala | 10 +++++++++-
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index f7e225b..5846d46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -545,6 +545,9 @@ object PartitioningUtils {
partitionColumns: Seq[String],
caseSensitive: Boolean): Unit = {
+ SchemaUtils.checkColumnNameDuplication(
+ partitionColumns, partitionColumns.mkString(", "), caseSensitive)
+
partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
field => field.dataType match {
case _: AtomicType => // OK
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index 1c4e2a9..6df1c5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.TestUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
@@ -156,4 +156,12 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession {
}
}
}
+
+ test("SPARK-31968: duplicate partition columns check") {
+ withTempPath { f =>
+ val e = intercept[AnalysisException](
+ Seq((3, 2)).toDF("a", "b").write.partitionBy("b", "b").csv(f.getAbsolutePath))
+ assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;"))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org