You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ue...@apache.org on 2017/09/23 15:05:28 UTC

spark git commit: [SPARK-22109][SQL] Resolves type conflicts between strings and timestamps in partition column

Repository: spark
Updated Branches:
  refs/heads/master 50ada2a4d -> 04975a68b


[SPARK-22109][SQL] Resolves type conflicts between strings and timestamps in partition column

## What changes were proposed in this pull request?

This PR proposes to resolve the type conflicts in strings and timestamps in partition column values.
It looks we need to set the timezone as it needs a cast between strings and timestamps.

```scala
val df = Seq((1, "2015-01-01 00:00:00"), (2, "2014-01-01 00:00:00"), (3, "blah")).toDF("i", "str")
val path = "/tmp/test.parquet"
df.write.format("parquet").partitionBy("str").save(path)
spark.read.parquet(path).show()
```

**Before**

```
java.util.NoSuchElementException: None.get
  at scala.None$.get(Option.scala:347)
  at scala.None$.get(Option.scala:345)
  at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression$class.timeZone(datetimeExpressions.scala:46)
  at org.apache.spark.sql.catalyst.expressions.Cast.timeZone$lzycompute(Cast.scala:172)
  at org.apache.spark.sql.catalyst.expressions.Cast.timeZone(Cast.scala:172)
  at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
  at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
  at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$buildCast(Cast.scala:201)
  at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3.apply(Cast.scala:207)
  at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:533)
  at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:331)
  at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:481)
  at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:480)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
```

**After**

```
+---+-------------------+
|  i|                str|
+---+-------------------+
|  2|2014-01-01 00:00:00|
|  1|2015-01-01 00:00:00|
|  3|               blah|
+---+-------------------+
```

## How was this patch tested?

Unit tests added in `ParquetPartitionDiscoverySuite` and manual tests.

Author: hyukjinkwon <gu...@gmail.com>

Closes #19331 from HyukjinKwon/SPARK-22109.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04975a68
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04975a68
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04975a68

Branch: refs/heads/master
Commit: 04975a68b583a6175f93da52374108e5d4754d9a
Parents: 50ada2a
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sun Sep 24 00:05:17 2017 +0900
Committer: Takuya UESHIN <ue...@databricks.com>
Committed: Sun Sep 24 00:05:17 2017 +0900

----------------------------------------------------------------------
 .../sql/execution/datasources/PartitioningUtils.scala   | 11 ++++++-----
 .../parquet/ParquetPartitionDiscoverySuite.scala        | 12 ++++++++++++
 2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/04975a68/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 92358da..1c00c9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -139,7 +139,7 @@ object PartitioningUtils {
           "root directory of the table. If there are multiple root directories, " +
           "please load them separately and then union them.")
 
-      val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
+      val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)
 
       // Creates the StructType which represents the partition columns.
       val fields = {
@@ -318,7 +318,8 @@ object PartitioningUtils {
    * }}}
    */
   def resolvePartitions(
-      pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
+      pathsWithPartitionValues: Seq[(Path, PartitionValues)],
+      timeZone: TimeZone): Seq[PartitionValues] = {
     if (pathsWithPartitionValues.isEmpty) {
       Seq.empty
     } else {
@@ -333,7 +334,7 @@ object PartitioningUtils {
       val values = pathsWithPartitionValues.map(_._2)
       val columnCount = values.head.columnNames.size
       val resolvedValues = (0 until columnCount).map { i =>
-        resolveTypeConflicts(values.map(_.literals(i)))
+        resolveTypeConflicts(values.map(_.literals(i)), timeZone)
       }
 
       // Fills resolved literals back to each partition
@@ -470,7 +471,7 @@ object PartitioningUtils {
    * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
    * types.
    */
-  private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
+  private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = {
     val desiredType = {
       val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
       // Falls back to string if all values of this column are null or empty string
@@ -478,7 +479,7 @@ object PartitioningUtils {
     }
 
     literals.map { case l @ Literal(_, dataType) =>
-      Literal.create(Cast(l, desiredType).eval(), desiredType)
+      Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/04975a68/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 837a087..f79b92b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -1055,4 +1055,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       }
     }
   }
+
+  test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") {
+    val df = Seq(
+      (1, "2015-01-01 00:00:00"),
+      (2, "2014-01-01 00:00:00"),
+      (3, "blah")).toDF("i", "str")
+
+    withTempPath { path =>
+      df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
+      checkAnswer(spark.read.load(path.getAbsolutePath), df)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org