You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/20 05:56:46 UTC

spark git commit: [SPARK-23436][SQL] Infer partition as Date only if it can be casted to Date

Repository: spark
Updated Branches:
  refs/heads/master f5850e789 -> 651b0277f


[SPARK-23436][SQL] Infer partition as Date only if it can be casted to Date

## What changes were proposed in this pull request?

Before the patch, Spark could infer as Date a partition value which cannot be casted to Date (this can happen when there are extra characters after a valid date, like `2018-02-15AAA`).

When this happens and the input format has metadata which define the schema of the table, then `null` is returned as a value for the partition column, because the `cast` operator used in (`PartitioningAwareFileIndex.inferPartitioning`) is unable to convert the value.

The PR checks in the partition inference that values can be casted to Date and Timestamp, in order to infer that datatype to them.

## How was this patch tested?

added UT

Author: Marco Gaido <ma...@gmail.com>

Closes #20621 from mgaido91/SPARK-23436.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/651b0277
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/651b0277
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/651b0277

Branch: refs/heads/master
Commit: 651b0277fe989119932d5ae1ef729c9768aa018d
Parents: f5850e7
Author: Marco Gaido <ma...@gmail.com>
Authored: Tue Feb 20 13:56:38 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Feb 20 13:56:38 2018 +0800

----------------------------------------------------------------------
 .../datasources/PartitioningUtils.scala         | 40 +++++++++++++++-----
 .../ParquetPartitionDiscoverySuite.scala        | 14 +++++++
 2 files changed, 44 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/651b0277/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 472bf82..379acb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -407,6 +407,34 @@ object PartitioningUtils {
       Literal(bigDecimal)
     }
 
+    val dateTry = Try {
+      // try and parse the date, if no exception occurs this is a candidate to be resolved as
+      // DateType
+      DateTimeUtils.getThreadLocalDateFormat.parse(raw)
+      // SPARK-23436: Casting the string to date may still return null if a bad Date is provided.
+      // This can happen since DateFormat.parse  may not use the entire text of the given string:
+      // so if there are extra-characters after the date, it returns correctly.
+      // We need to check that we can cast the raw string since we later can use Cast to get
+      // the partition values with the right DataType (see
+      // org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning)
+      val dateValue = Cast(Literal(raw), DateType).eval()
+      // Disallow DateType if the cast returned null
+      require(dateValue != null)
+      Literal.create(dateValue, DateType)
+    }
+
+    val timestampTry = Try {
+      val unescapedRaw = unescapePathName(raw)
+      // try and parse the date, if no exception occurs this is a candidate to be resolved as
+      // TimestampType
+      DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw)
+      // SPARK-23436: see comment for date
+      val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(timeZone.getID)).eval()
+      // Disallow TimestampType if the cast returned null
+      require(timestampValue != null)
+      Literal.create(timestampValue, TimestampType)
+    }
+
     if (typeInference) {
       // First tries integral types
       Try(Literal.create(Integer.parseInt(raw), IntegerType))
@@ -415,16 +443,8 @@ object PartitioningUtils {
         // Then falls back to fractional types
         .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
         // Then falls back to date/timestamp types
-        .orElse(Try(
-          Literal.create(
-            DateTimeUtils.getThreadLocalTimestampFormat(timeZone)
-              .parse(unescapePathName(raw)).getTime * 1000L,
-            TimestampType)))
-        .orElse(Try(
-          Literal.create(
-            DateTimeUtils.millisToDays(
-              DateTimeUtils.getThreadLocalDateFormat.parse(raw).getTime),
-            DateType)))
+        .orElse(timestampTry)
+        .orElse(dateTry)
         // Then falls back to string
         .getOrElse {
           if (raw == DEFAULT_PARTITION_NAME) {

http://git-wip-us.apache.org/repos/asf/spark/blob/651b0277/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index d490264..edb3da9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -1120,4 +1120,18 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
           Row(3, BigDecimal("2" * 30)) :: Nil)
     }
   }
+
+  test("SPARK-23436: invalid Dates should be inferred as String in partition inference") {
+    withTempPath { path =>
+      val data = Seq(("1", "2018-01", "2018-01-01-04", "test"))
+        .toDF("id", "date_month", "date_hour", "data")
+
+      data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath)
+      val input = spark.read.parquet(path.getAbsolutePath).select("id",
+        "date_month", "date_hour", "data")
+
+      assert(input.schema.sameType(input.schema))
+      checkAnswer(input, data)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org