You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/18 11:20:00 UTC

[jira] [Commented] (SPARK-19228) inferSchema function processed csv date column as string and "dateFormat" DataSource option is ignored

    [ https://issues.apache.org/jira/browse/SPARK-19228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723960#comment-16723960 ] 

ASF GitHub Bot commented on SPARK-19228:
----------------------------------------

HyukjinKwon closed pull request #21363: [SPARK-19228][SQL] Migrate on Java 8 time from FastDateFormat for meet the ISO8601
URL: https://github.com/apache/spark/pull/21363
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 80f15053005ff..9eaf6a2862a0f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util
 
 import java.sql.{Date, Timestamp}
 import java.text.{DateFormat, SimpleDateFormat}
+import java.time.LocalDateTime
+import java.time.temporal.ChronoField
 import java.util.{Calendar, Locale, TimeZone}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.{Function => JFunction}
@@ -143,6 +145,12 @@ object DateTimeUtils {
     millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone)
   }
 
+  def dateTimeToMicroseconds(localDateTime: LocalDateTime, timeZone: TimeZone): Long = {
+    val microOfSecond = localDateTime.getLong(ChronoField.MICRO_OF_SECOND)
+    val epochSecond = localDateTime.atZone(timeZone.toZoneId).toInstant.getEpochSecond
+    epochSecond * 1000000L + microOfSecond
+  }
+
   def dateToString(days: SQLDate): String =
     getThreadLocalDateFormat.format(toJavaDate(days))
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index cbf6106697f30..cd1b7395b97d5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -19,11 +19,14 @@ package org.apache.spark.sql.catalyst.util
 
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
+import java.time.LocalDateTime
+import java.time.format.DateTimeFormatter
 import java.util.{Calendar, Locale, TimeZone}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.unsafe.types.UTF8String
+import org.junit.Assert.assertEquals
 
 class DateTimeUtilsSuite extends SparkFunSuite {
 
@@ -645,6 +648,18 @@ class DateTimeUtilsSuite extends SparkFunSuite {
     }
   }
 
+  test("Java 8 LocalDateTime to microseconds") {
+    val nanos = "2015-05-09 00:10:23.999750987"
+    var formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
+    val localDateTimeInNanos = LocalDateTime.parse(nanos, formatter)
+    val timeInMicros = dateTimeToMicroseconds(localDateTimeInNanos, TimeZonePST)
+    assertEquals(1431155423999750L, timeInMicros)
+    val micros = "2015-05-09 00:10:23.999750"
+    formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")
+    val localDateTimeInMicros = LocalDateTime.parse(micros, formatter)
+    assertEquals(timeInMicros, dateTimeToMicroseconds(localDateTimeInMicros, TimeZonePST))
+  }
+
   test("daysToMillis and millisToDays") {
     val c = Calendar.getInstance(TimeZonePST)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index a585cbed2551b..6239f5666cd4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -90,6 +90,7 @@ private[csv] object CSVInferSchema {
           // DecimalTypes have different precisions and scales, so we try to find the common type.
           findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType)
         case DoubleType => tryParseDouble(field, options)
+        case DateType => tryParseDate(field, options)
         case TimestampType => tryParseTimestamp(field, options)
         case BooleanType => tryParseBoolean(field, options)
         case StringType => StringType
@@ -140,14 +141,23 @@ private[csv] object CSVInferSchema {
   private def tryParseDouble(field: String, options: CSVOptions): DataType = {
     if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) {
       DoubleType
+    } else {
+      tryParseDate(field, options)
+    }
+  }
+
+  private def tryParseDate(field: String, options: CSVOptions): DataType = {
+    // This case infers a custom `dateFormat` is set.
+    if ((allCatch opt options.dateFormatter.parse(field)).isDefined) {
+      DateType
     } else {
       tryParseTimestamp(field, options)
     }
   }
 
   private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
-    // This case infers a custom `dataFormat` is set.
-    if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
+    // This case infers a custom `timestampFormat` is set.
+    if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) {
       TimestampType
     } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
       // We keep this for backwards compatibility.
@@ -216,6 +226,9 @@ private[csv] object CSVInferSchema {
       } else {
         Some(DecimalType(range + scale, scale))
       }
+    // By design 'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes).
+    case (t1: DateType, t2: TimestampType) => Some(TimestampType)
+    case (t1: TimestampType, t2: DateType) => Some(TimestampType)
 
     case _ => None
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 1066d156acd74..fae56744feb7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,10 +18,10 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.nio.charset.StandardCharsets
+import java.time.format.{DateTimeFormatter, ResolverStyle}
 import java.util.{Locale, TimeZone}
 
 import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
-import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util._
@@ -119,7 +119,6 @@ class CSVOptions(
   val positiveInf = parameters.getOrElse("positiveInf", "Inf")
   val negativeInf = parameters.getOrElse("negativeInf", "-Inf")
 
-
   val compressionCodec: Option[String] = {
     val name = parameters.get("compression").orElse(parameters.get("codec"))
     name.map(CompressionCodecs.getCodecClassName)
@@ -128,13 +127,20 @@ class CSVOptions(
   val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
-  // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
-  val dateFormat: FastDateFormat =
-    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
+  val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
+
+  val timestampFormat: String = parameters
+    .getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
 
-  val timestampFormat: FastDateFormat =
-    FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)
+  @transient lazy val dateFormatter: DateTimeFormatter = {
+    DateTimeFormatter.ofPattern(dateFormat)
+      .withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
+  }
+
+  @transient lazy val timestampFormatter: DateTimeFormatter = {
+    DateTimeFormatter.ofPattern(timestampFormat)
+      .withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
+  }
 
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 99557a1ceb0c8..2ddd0d6dad089 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv
 
 import java.io.InputStream
 import java.math.BigDecimal
+import java.time.{LocalDate, LocalDateTime}
+import java.time.temporal.ChronoField
 
 import scala.util.Try
 import scala.util.control.NonFatal
@@ -131,9 +133,8 @@ class UnivocityParser(
 
     case _: TimestampType => (d: String) =>
       nullSafeDatum(d, name, nullable, options) { datum =>
-        // This one will lose microseconds parts.
-        // See https://issues.apache.org/jira/browse/SPARK-10681.
-        Try(options.timestampFormat.parse(datum).getTime * 1000L)
+        Try(DateTimeUtils.dateTimeToMicroseconds(LocalDateTime
+          .parse(datum, options.timestampFormatter), options.timeZone))
           .getOrElse {
           // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
           // compatibility.
@@ -143,9 +144,8 @@ class UnivocityParser(
 
     case _: DateType => (d: String) =>
       nullSafeDatum(d, name, nullable, options) { datum =>
-        // This one will lose microseconds parts.
-        // See https://issues.apache.org/jira/browse/SPARK-10681.x
-        Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
+        Try(Math.toIntExact(LocalDate.parse(datum, options.dateFormatter)
+          .getLong(ChronoField.EPOCH_DAY)))
           .getOrElse {
           // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
           // compatibility.
diff --git a/sql/core/src/test/resources/test-data/dates-and-timestamps.csv b/sql/core/src/test/resources/test-data/dates-and-timestamps.csv
new file mode 100644
index 0000000000000..0a9a4c2f8566c
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/dates-and-timestamps.csv
@@ -0,0 +1,4 @@
+timestamp,date
+26/08/2015 22:31:46.913,27/09/2015
+27/10/2014 22:33:31.601,26/12/2016
+28/01/2016 22:33:52.888,28/01/2017
\ No newline at end of file
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index 661742087112f..c8cfc07a4a1dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -59,13 +59,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
     assert(CSVInferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne)
   }
 
-  test("Timestamp field types are inferred correctly via custom data format") {
-    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
+  test("Timestamp field types are inferred correctly via custom date format") {
+    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM"), "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
     options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
   }
 
+  test("Date field types are inferred correctly via custom date and timestamp format") {
+    val options = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy",
+      "timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS"), "GMT")
+    assert(CSVInferSchema.inferField(TimestampType,
+      "28/01/2017 22:31:46.913", options) == TimestampType)
+    assert(CSVInferSchema.inferField(DateType, "16/12/2012", options) == DateType)
+  }
+
   test("Timestamp field types are inferred correctly from other types") {
     val options = new CSVOptions(Map.empty[String, String], "GMT")
     assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)
@@ -111,7 +119,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
-    val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT")
+    val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-MM"), "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 07e6c74b14d0d..b571b9430d953 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
   private val simpleSparseFile = "test-data/simple_sparse.csv"
   private val numbersFile = "test-data/numbers.csv"
   private val datesFile = "test-data/dates.csv"
+  private val datesAndTimestampsFile = "test-data/dates-and-timestamps.csv"
   private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
   private val valueMalformedFile = "test-data/value-malformed.csv"
 
@@ -565,6 +566,44 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
     assert(results.toSeq.map(_.toSeq) === expected)
   }
 
+  test("inferring timestamp types and date types via custom formats") {
+    val options = Map(
+      "header" -> "true",
+      "inferSchema" -> "true",
+      "timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS",
+      "dateFormat" -> "dd/MM/yyyy")
+    val results = spark.read
+      .format("csv")
+      .options(options)
+      .load(testFile(datesAndTimestampsFile))
+    assert(results.schema{0}.dataType===TimestampType)
+    assert(results.schema{1}.dataType===DateType)
+    val timestamps = spark.read
+      .format("csv")
+      .options(options)
+      .load(testFile(datesAndTimestampsFile))
+      .select("timestamp")
+      .collect()
+    val timestampFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS", Locale.US)
+    val timestampExpected =
+      Seq(Seq(new Timestamp(timestampFormat.parse("26/08/2015 22:31:46.913").getTime)),
+        Seq(new Timestamp(timestampFormat.parse("27/10/2014 22:33:31.601").getTime)),
+        Seq(new Timestamp(timestampFormat.parse("28/01/2016 22:33:52.888").getTime)))
+    assert(timestamps.toSeq.map(_.toSeq) === timestampExpected)
+    val dates = spark.read
+      .format("csv")
+      .options(options)
+      .load(testFile(datesAndTimestampsFile))
+      .select("date")
+      .collect()
+    val dateFormat = new SimpleDateFormat("dd/MM/yyyy", Locale.US)
+    val dateExpected =
+      Seq(Seq(new Date(dateFormat.parse("27/09/2015").getTime)),
+        Seq(new Date(dateFormat.parse("26/12/2016").getTime)),
+        Seq(new Date(dateFormat.parse("28/01/2017").getTime)))
+    assert(dates.toSeq.map(_.toSeq) === dateExpected)
+  }
+
   test("load date types via custom date format") {
     val customSchema = new StructType(Array(StructField("date", DateType, true)))
     val options = Map(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
index efbf73534bd19..257a683ee9d9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.math.BigDecimal
-import java.util.Locale
+import java.time.{LocalDate, LocalDateTime}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -107,20 +107,26 @@ class UnivocityParserSuite extends SparkFunSuite {
     assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true)
 
     val timestampsOptions =
-      new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT")
+      new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy HH:mm"), "GMT")
     val customTimestamp = "31/01/2015 00:00"
-    val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
+
+    val expectedTime = LocalDateTime.parse(customTimestamp, timestampsOptions.timestampFormatter)
+      .atZone(options.timeZone.toZoneId)
+      .toInstant.toEpochMilli
     val castedTimestamp =
-      parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
+      parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions)
         .apply(customTimestamp)
     assert(castedTimestamp == expectedTime * 1000L)
 
-    val customDate = "31/01/2015"
     val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT")
-    val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
+    val customDate = "31/01/2015"
+
+    val expectedDate = LocalDate.parse(customDate, dateOptions.dateFormatter)
+      .atStartOfDay(options.timeZone.toZoneId)
+      .toInstant.toEpochMilli
     val castedDate =
-      parser.makeConverter("_1", DateType, nullable = true, options = dateOptions)
-        .apply(customTimestamp)
+      parser.makeConverter("_1", DateType, nullable = true, dateOptions)
+        .apply(customDate)
     assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
 
     val timestamp = "2015-01-01 00:00:00"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> inferSchema function processed csv date column as string and "dateFormat" DataSource option is ignored
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19228
>                 URL: https://issues.apache.org/jira/browse/SPARK-19228
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, SQL
>    Affects Versions: 2.1.0
>            Reporter: Sergey Rubtsov
>            Priority: Major
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> Current FastDateFormat parser can't properly parse date and timestamp and does not meet the ISO8601.
>  For example, I need to process user.csv like this:
> {code:java}
> id,project,started,ended
> sergey.rubtsov,project0,12/12/2012,10/10/2015
> {code}
> When I add date format options:
> {code:java}
>         Dataset<Row> users = spark.read().format("csv").option("mode", "PERMISSIVE").option("header", "true")
>                                 .option("inferSchema", "true").option("dateFormat", "dd/MM/yyyy").load("src/main/resources/user.csv");
> 		users.printSchema();
> {code}
> expected scheme should be
> {code:java}
> root
>  |-- id: string (nullable = true)
>  |-- project: string (nullable = true)
>  |-- started: date (nullable = true)
>  |-- ended: date (nullable = true)
> {code}
> but the actual result is:
> {code:java}
> root
>  |-- id: string (nullable = true)
>  |-- project: string (nullable = true)
>  |-- started: string (nullable = true)
>  |-- ended: string (nullable = true)
> {code}
> This mean that date processed as string and "dateFormat" option is ignored.
>  If I add option
> {code:java}
> .option("timestampFormat", "dd/MM/yyyy")
> {code}
> result is:
> {code:java}
> root
>  |-- id: string (nullable = true)
>  |-- project: string (nullable = true)
>  |-- started: timestamp (nullable = true)
>  |-- ended: timestamp (nullable = true)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org