You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ue...@apache.org on 2017/09/23 17:51:23 UTC
spark git commit: [SPARK-22109][SQL][BRANCH-2.2] Resolves type
conflicts between strings and timestamps in partition column
Repository: spark
Updated Branches:
refs/heads/branch-2.2 1a829df94 -> 211d81beb
[SPARK-22109][SQL][BRANCH-2.2] Resolves type conflicts between strings and timestamps in partition column
## What changes were proposed in this pull request?
This PR backports https://github.com/apache/spark/commit/04975a68b583a6175f93da52374108e5d4754d9a into branch-2.2.
## How was this patch tested?
Unit tests in `ParquetPartitionDiscoverySuite`.
Author: hyukjinkwon <gu...@gmail.com>
Closes #19333 from HyukjinKwon/SPARK-22109-backport-2.2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/211d81be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/211d81be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/211d81be
Branch: refs/heads/branch-2.2
Commit: 211d81beb001a113e262d399fcacbd72f33ea0d9
Parents: 1a829df
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sun Sep 24 02:51:04 2017 +0900
Committer: Takuya UESHIN <ue...@databricks.com>
Committed: Sun Sep 24 02:51:04 2017 +0900
----------------------------------------------------------------------
.../sql/execution/datasources/PartitioningUtils.scala | 11 ++++++-----
.../parquet/ParquetPartitionDiscoverySuite.scala | 12 ++++++++++++
2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/211d81be/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index f61c673..6f74381 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -138,7 +138,7 @@ object PartitioningUtils {
"root directory of the table. If there are multiple root directories, " +
"please load them separately and then union them.")
- val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
+ val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)
// Creates the StructType which represents the partition columns.
val fields = {
@@ -322,7 +322,8 @@ object PartitioningUtils {
* }}}
*/
def resolvePartitions(
- pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
+ pathsWithPartitionValues: Seq[(Path, PartitionValues)],
+ timeZone: TimeZone): Seq[PartitionValues] = {
if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
@@ -337,7 +338,7 @@ object PartitioningUtils {
val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
- resolveTypeConflicts(values.map(_.literals(i)))
+ resolveTypeConflicts(values.map(_.literals(i)), timeZone)
}
// Fills resolved literals back to each partition
@@ -474,7 +475,7 @@ object PartitioningUtils {
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
*/
- private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
+ private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = {
val desiredType = {
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
// Falls back to string if all values of this column are null or empty string
@@ -482,7 +483,7 @@ object PartitioningUtils {
}
literals.map { case l @ Literal(_, dataType) =>
- Literal.create(Cast(l, desiredType).eval(), desiredType)
+ Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/211d81be/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index b4f3de9..7225693 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -1022,4 +1022,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
}
+
+ test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") {
+ val df = Seq(
+ (1, "2015-01-01 00:00:00"),
+ (2, "2014-01-01 00:00:00"),
+ (3, "blah")).toDF("i", "str")
+
+ withTempPath { path =>
+ df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
+ checkAnswer(spark.read.load(path.getAbsolutePath), df)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org