You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/20 05:56:46 UTC
spark git commit: [SPARK-23436][SQL] Infer partition as Date only if
it can be casted to Date
Repository: spark
Updated Branches:
refs/heads/master f5850e789 -> 651b0277f
[SPARK-23436][SQL] Infer partition as Date only if it can be casted to Date
## What changes were proposed in this pull request?
Before the patch, Spark could infer as Date a partition value which cannot be casted to Date (this can happen when there are extra characters after a valid date, like `2018-02-15AAA`).
When this happens and the input format has metadata which define the schema of the table, then `null` is returned as a value for the partition column, because the `cast` operator used in (`PartitioningAwareFileIndex.inferPartitioning`) is unable to convert the value.
The PR checks in the partition inference that values can be casted to Date and Timestamp, in order to infer that datatype to them.
## How was this patch tested?
added UT
Author: Marco Gaido <ma...@gmail.com>
Closes #20621 from mgaido91/SPARK-23436.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/651b0277
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/651b0277
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/651b0277
Branch: refs/heads/master
Commit: 651b0277fe989119932d5ae1ef729c9768aa018d
Parents: f5850e7
Author: Marco Gaido <ma...@gmail.com>
Authored: Tue Feb 20 13:56:38 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Feb 20 13:56:38 2018 +0800
----------------------------------------------------------------------
.../datasources/PartitioningUtils.scala | 40 +++++++++++++++-----
.../ParquetPartitionDiscoverySuite.scala | 14 +++++++
2 files changed, 44 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/651b0277/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 472bf82..379acb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -407,6 +407,34 @@ object PartitioningUtils {
Literal(bigDecimal)
}
+ val dateTry = Try {
+ // try and parse the date, if no exception occurs this is a candidate to be resolved as
+ // DateType
+ DateTimeUtils.getThreadLocalDateFormat.parse(raw)
+ // SPARK-23436: Casting the string to date may still return null if a bad Date is provided.
+ // This can happen since DateFormat.parse may not use the entire text of the given string:
+ // so if there are extra-characters after the date, it returns correctly.
+ // We need to check that we can cast the raw string since we later can use Cast to get
+ // the partition values with the right DataType (see
+ // org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning)
+ val dateValue = Cast(Literal(raw), DateType).eval()
+ // Disallow DateType if the cast returned null
+ require(dateValue != null)
+ Literal.create(dateValue, DateType)
+ }
+
+ val timestampTry = Try {
+ val unescapedRaw = unescapePathName(raw)
+ // try and parse the date, if no exception occurs this is a candidate to be resolved as
+ // TimestampType
+ DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw)
+ // SPARK-23436: see comment for date
+ val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(timeZone.getID)).eval()
+ // Disallow TimestampType if the cast returned null
+ require(timestampValue != null)
+ Literal.create(timestampValue, TimestampType)
+ }
+
if (typeInference) {
// First tries integral types
Try(Literal.create(Integer.parseInt(raw), IntegerType))
@@ -415,16 +443,8 @@ object PartitioningUtils {
// Then falls back to fractional types
.orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
// Then falls back to date/timestamp types
- .orElse(Try(
- Literal.create(
- DateTimeUtils.getThreadLocalTimestampFormat(timeZone)
- .parse(unescapePathName(raw)).getTime * 1000L,
- TimestampType)))
- .orElse(Try(
- Literal.create(
- DateTimeUtils.millisToDays(
- DateTimeUtils.getThreadLocalDateFormat.parse(raw).getTime),
- DateType)))
+ .orElse(timestampTry)
+ .orElse(dateTry)
// Then falls back to string
.getOrElse {
if (raw == DEFAULT_PARTITION_NAME) {
http://git-wip-us.apache.org/repos/asf/spark/blob/651b0277/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index d490264..edb3da9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -1120,4 +1120,18 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
Row(3, BigDecimal("2" * 30)) :: Nil)
}
}
+
+ test("SPARK-23436: invalid Dates should be inferred as String in partition inference") {
+ withTempPath { path =>
+ val data = Seq(("1", "2018-01", "2018-01-01-04", "test"))
+ .toDF("id", "date_month", "date_hour", "data")
+
+ data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath)
+ val input = spark.read.parquet(path.getAbsolutePath).select("id",
+ "date_month", "date_hour", "data")
+
+ assert(input.schema.sameType(input.schema))
+ checkAnswer(input, data)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org