You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ue...@apache.org on 2017/09/23 17:51:23 UTC

spark git commit: [SPARK-22109][SQL][BRANCH-2.2] Resolves type conflicts between strings and timestamps in partition column

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 1a829df94 -> 211d81beb


[SPARK-22109][SQL][BRANCH-2.2] Resolves type conflicts between strings and timestamps in partition column

## What changes were proposed in this pull request?

This PR backports https://github.com/apache/spark/commit/04975a68b583a6175f93da52374108e5d4754d9a into branch-2.2.

## How was this patch tested?

Unit tests in `ParquetPartitionDiscoverySuite`.

Author: hyukjinkwon <gu...@gmail.com>

Closes #19333 from HyukjinKwon/SPARK-22109-backport-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/211d81be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/211d81be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/211d81be

Branch: refs/heads/branch-2.2
Commit: 211d81beb001a113e262d399fcacbd72f33ea0d9
Parents: 1a829df
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sun Sep 24 02:51:04 2017 +0900
Committer: Takuya UESHIN <ue...@databricks.com>
Committed: Sun Sep 24 02:51:04 2017 +0900

----------------------------------------------------------------------
 .../sql/execution/datasources/PartitioningUtils.scala   | 11 ++++++-----
 .../parquet/ParquetPartitionDiscoverySuite.scala        | 12 ++++++++++++
 2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/211d81be/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index f61c673..6f74381 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -138,7 +138,7 @@ object PartitioningUtils {
           "root directory of the table. If there are multiple root directories, " +
           "please load them separately and then union them.")
 
-      val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
+      val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)
 
       // Creates the StructType which represents the partition columns.
       val fields = {
@@ -322,7 +322,8 @@ object PartitioningUtils {
    * }}}
    */
   def resolvePartitions(
-      pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
+      pathsWithPartitionValues: Seq[(Path, PartitionValues)],
+      timeZone: TimeZone): Seq[PartitionValues] = {
     if (pathsWithPartitionValues.isEmpty) {
       Seq.empty
     } else {
@@ -337,7 +338,7 @@ object PartitioningUtils {
       val values = pathsWithPartitionValues.map(_._2)
       val columnCount = values.head.columnNames.size
       val resolvedValues = (0 until columnCount).map { i =>
-        resolveTypeConflicts(values.map(_.literals(i)))
+        resolveTypeConflicts(values.map(_.literals(i)), timeZone)
       }
 
       // Fills resolved literals back to each partition
@@ -474,7 +475,7 @@ object PartitioningUtils {
    * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
    * types.
    */
-  private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
+  private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = {
     val desiredType = {
       val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
       // Falls back to string if all values of this column are null or empty string
@@ -482,7 +483,7 @@ object PartitioningUtils {
     }
 
     literals.map { case l @ Literal(_, dataType) =>
-      Literal.create(Cast(l, desiredType).eval(), desiredType)
+      Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/211d81be/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index b4f3de9..7225693 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -1022,4 +1022,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       }
     }
   }
+
+  test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") {
+    val df = Seq(
+      (1, "2015-01-01 00:00:00"),
+      (2, "2014-01-01 00:00:00"),
+      (3, "blah")).toDF("i", "str")
+
+    withTempPath { path =>
+      df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
+      checkAnswer(spark.read.load(path.getAbsolutePath), df)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org