You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/08/25 04:19:39 UTC

spark git commit: [SPARK-16216][SQL][BRANCH-2.0] Backport Read/write dateFormat/timestampFormat options for CSV and JSON

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 9f363a690 -> 3258f27a8


[SPARK-16216][SQL][BRANCH-2.0] Backport Read/write dateFormat/timestampFormat options for CSV and JSON

## What changes were proposed in this pull request?

This PR backports https://github.com/apache/spark/pull/14279 to 2.0.

## How was this patch tested?

Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases.

Author: hyukjinkwon <gu...@gmail.com>

Closes #14799 from HyukjinKwon/SPARK-16216-json-csv-backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3258f27a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3258f27a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3258f27a

Branch: refs/heads/branch-2.0
Commit: 3258f27a881dfeb5ab8bae90c338603fa4b6f9d8
Parents: 9f363a6
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Aug 24 21:19:35 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Aug 24 21:19:35 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  56 +++++--
 python/pyspark/sql/streaming.py                 |  30 +++-
 .../org/apache/spark/sql/DataFrameReader.scala  |  17 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  12 ++
 .../datasources/csv/CSVInferSchema.scala        |  42 ++---
 .../execution/datasources/csv/CSVOptions.scala  |  15 +-
 .../execution/datasources/csv/CSVRelation.scala |  43 ++++-
 .../datasources/json/JSONOptions.scala          |   9 ++
 .../datasources/json/JacksonGenerator.scala     |  14 +-
 .../datasources/json/JacksonParser.scala        |  68 ++++----
 .../datasources/json/JsonFileFormat.scala       |   5 +-
 .../spark/sql/streaming/DataStreamReader.scala  |  16 +-
 .../datasources/csv/CSVInferSchemaSuite.scala   |   4 +-
 .../execution/datasources/csv/CSVSuite.scala    | 156 ++++++++++++++++++-
 .../datasources/csv/CSVTypeCastSuite.scala      |  17 +-
 .../execution/datasources/json/JsonSuite.scala  |  74 ++++++++-
 .../datasources/json/TestJsonData.scala         |   6 +
 .../sql/sources/JsonHadoopFsRelationSuite.scala |   4 +
 18 files changed, 478 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 64de33e..3da6f49 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -156,7 +156,7 @@ class DataFrameReader(OptionUtils):
     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
-             mode=None, columnNameOfCorruptRecord=None):
+             mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
         """
         Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
         (one object per record) and returns the result as a :class`DataFrame`.
@@ -198,6 +198,14 @@ class DataFrameReader(OptionUtils):
                                           ``spark.sql.columnNameOfCorruptRecord``. If None is set,
                                           it uses the value specified in
                                           ``spark.sql.columnNameOfCorruptRecord``.
+        :param dateFormat: sets the string that indicates a date format. Custom date formats
+                           follow the formats at ``java.text.SimpleDateFormat``. This
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
 
         >>> df1 = spark.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
@@ -213,7 +221,8 @@ class DataFrameReader(OptionUtils):
             allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
-            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+            timestampFormat=timestampFormat)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
@@ -285,8 +294,8 @@ class DataFrameReader(OptionUtils):
     def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
-            negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
-            maxMalformedLogPerPartition=None, mode=None):
+            negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
         """Loads a CSV file and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -327,9 +336,12 @@ class DataFrameReader(OptionUtils):
                             is set, it uses the default value, ``Inf``.
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
-                           applies to both date type and timestamp type. By default, it is None
-                           which means trying to parse times and date by
-                           ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
         :param maxColumns: defines a hard limit of how many columns a record can have. If None is
                            set, it uses the default value, ``20480``.
         :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -356,7 +368,8 @@ class DataFrameReader(OptionUtils):
             header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
             ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
             nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
-            dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+            dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+            maxCharsPerColumn=maxCharsPerColumn,
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
         if isinstance(path, basestring):
             path = [path]
@@ -571,7 +584,7 @@ class DataFrameWriter(OptionUtils):
         self._jwrite.saveAsTable(name)
 
     @since(1.4)
-    def json(self, path, mode=None, compression=None):
+    def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
         """Saves the content of the :class:`DataFrame` in JSON format at the specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -584,11 +597,20 @@ class DataFrameWriter(OptionUtils):
         :param compression: compression codec to use when saving to file. This can be one of the
                             known case-insensitive shorten names (none, bzip2, gzip, lz4,
                             snappy and deflate).
+        :param dateFormat: sets the string that indicates a date format. Custom date formats
+                           follow the formats at ``java.text.SimpleDateFormat``. This
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
 
         >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
-        self._set_opts(compression=compression)
+        self._set_opts(
+            compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
         self._jwrite.json(path)
 
     @since(1.4)
@@ -634,7 +656,8 @@ class DataFrameWriter(OptionUtils):
 
     @since(2.0)
     def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
-            header=None, nullValue=None, escapeQuotes=None, quoteAll=None):
+            header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
+            timestampFormat=None):
         """Saves the content of the :class:`DataFrame` in CSV format at the specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -666,12 +689,21 @@ class DataFrameWriter(OptionUtils):
                        the default value, ``false``.
         :param nullValue: sets the string representation of a null value. If None is set, it uses
                           the default value, empty string.
+        :param dateFormat: sets the string that indicates a date format. Custom date formats
+                           follow the formats at ``java.text.SimpleDateFormat``. This
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
 
         >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
         self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
-                       nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll)
+                       nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
+                       dateFormat=dateFormat, timestampFormat=timestampFormat)
         self._jwrite.csv(path)
 
     @since(1.5)

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a364555..3761d2b 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -338,7 +338,8 @@ class DataStreamReader(OptionUtils):
     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
-             mode=None, columnNameOfCorruptRecord=None):
+             mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
+             timestampFormat=None):
         """
         Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
 
@@ -381,6 +382,14 @@ class DataStreamReader(OptionUtils):
                                           ``spark.sql.columnNameOfCorruptRecord``. If None is set,
                                           it uses the value specified in
                                           ``spark.sql.columnNameOfCorruptRecord``.
+        :param dateFormat: sets the string that indicates a date format. Custom date formats
+                           follow the formats at ``java.text.SimpleDateFormat``. This
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
 
         >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
         >>> json_sdf.isStreaming
@@ -393,7 +402,8 @@ class DataStreamReader(OptionUtils):
             allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
-            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+            timestampFormat=timestampFormat)
         if isinstance(path, basestring):
             return self._df(self._jreader.json(path))
         else:
@@ -450,8 +460,8 @@ class DataStreamReader(OptionUtils):
     def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
-            negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
-            maxMalformedLogPerPartition=None, mode=None):
+            negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
         """Loads a CSV file stream and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -494,9 +504,12 @@ class DataStreamReader(OptionUtils):
                             is set, it uses the default value, ``Inf``.
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
-                           applies to both date type and timestamp type. By default, it is None
-                           which means trying to parse times and date by
-                           ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
         :param maxColumns: defines a hard limit of how many columns a record can have. If None is
                            set, it uses the default value, ``20480``.
         :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -521,7 +534,8 @@ class DataStreamReader(OptionUtils):
             header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
             ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
             nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
-            dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+            dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+            maxCharsPerColumn=maxCharsPerColumn,
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
         if isinstance(path, basestring):
             return self._df(self._jreader.csv(path))

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e8c2885..083c2e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -280,6 +280,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * <li>`columnNameOfCorruptRecord` (default is the value specified in
    * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
    * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    * </ul>
    * @since 2.0.0
    */
@@ -376,10 +382,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * value.</li>
    * <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
    * value.</li>
-   * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
-   * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
-   * and timestamp type. By default, it is `null` which means trying to parse times and date by
-   * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.</li>
    * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 12b3046..767af99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -452,6 +452,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
    * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
    * `snappy` and `deflate`). </li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    *
    * @since 1.4.0
    */
@@ -544,6 +550,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
    * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
    * `snappy` and `deflate`). </li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    *
    * @since 2.0.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index de3d889..f1b4c11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -139,20 +139,14 @@ private[csv] object CSVInferSchema {
   }
 
   private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
-    if (options.dateFormat != null) {
-      // This case infers a custom `dataFormat` is set.
-      if ((allCatch opt options.dateFormat.parse(field)).isDefined) {
-        TimestampType
-      } else {
-        tryParseBoolean(field, options)
-      }
-    } else {
+    // This case infers a custom `dataFormat` is set.
+    if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
+      TimestampType
+    } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
       // We keep this for backwords competibility.
-      if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
-        TimestampType
-      } else {
-        tryParseBoolean(field, options)
-      }
+      TimestampType
+    } else {
+      tryParseBoolean(field, options)
     }
   }
 
@@ -277,18 +271,24 @@ private[csv] object CSVTypeCast {
           val value = new BigDecimal(datum.replaceAll(",", ""))
           Decimal(value, dt.precision, dt.scale)
         }
-      case _: TimestampType if options.dateFormat != null =>
-        // This one will lose microseconds parts.
-        // See https://issues.apache.org/jira/browse/SPARK-10681.
-        options.dateFormat.parse(datum).getTime * 1000L
       case _: TimestampType =>
         // This one will lose microseconds parts.
         // See https://issues.apache.org/jira/browse/SPARK-10681.
-        DateTimeUtils.stringToTime(datum).getTime  * 1000L
-      case _: DateType if options.dateFormat != null =>
-        DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)
+        Try(options.timestampFormat.parse(datum).getTime * 1000L)
+          .getOrElse {
+            // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+            // compatibility.
+            DateTimeUtils.stringToTime(datum).getTime  * 1000L
+          }
       case _: DateType =>
-        DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
+        // This one will lose microseconds parts.
+        // See https://issues.apache.org/jira/browse/SPARK-10681.x
+        Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
+          .getOrElse {
+            // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+            // compatibility.
+            DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
+          }
       case _: StringType => UTF8String.fromString(datum)
       case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 10fe541..364d7c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.nio.charset.StandardCharsets
-import java.text.SimpleDateFormat
+
+import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
@@ -101,11 +102,13 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str
     name.map(CompressionCodecs.getCodecClassName)
   }
 
-  // Share date format object as it is expensive to parse date pattern.
-  val dateFormat: SimpleDateFormat = {
-    val dateFormat = parameters.get("dateFormat")
-    dateFormat.map(new SimpleDateFormat(_)).orNull
-  }
+  // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+  val dateFormat: FastDateFormat =
+    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+  val timestampFormat: FastDateFormat =
+    FastDateFormat.getInstance(
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
 
   val maxColumns = getInt("maxColumns", 20480)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 10d84f4..d0d5ce0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.types._
@@ -179,6 +180,14 @@ private[csv] class CsvOutputWriter(
   // create the Generator without separator inserted between 2 records
   private[this] val text = new Text()
 
+  // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`.
+  // When the value is null, this converter should not be called.
+  private type ValueConverter = (InternalRow, Int) => String
+
+  // `ValueConverter`s for all values in the fields of the schema
+  private val valueConverters: Array[ValueConverter] =
+    dataSchema.map(_.dataType).map(makeConverter).toArray
+
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
@@ -195,18 +204,40 @@ private[csv] class CsvOutputWriter(
   private var records: Long = 0L
   private val csvWriter = new LineCsvWriter(params, dataSchema.fieldNames.toSeq)
 
-  private def rowToString(row: Seq[Any]): Seq[String] = row.map { field =>
-    if (field != null) {
-      field.toString
-    } else {
-      params.nullValue
+  private def rowToString(row: InternalRow): Seq[String] = {
+    var i = 0
+    val values = new Array[String](row.numFields)
+    while (i < row.numFields) {
+      if (!row.isNullAt(i)) {
+        values(i) = valueConverters(i).apply(row, i)
+      } else {
+        values(i) = params.nullValue
+      }
+      i += 1
     }
+    values
+  }
+
+  private def makeConverter(dataType: DataType): ValueConverter = dataType match {
+    case DateType =>
+      (row: InternalRow, ordinal: Int) =>
+        params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+
+    case TimestampType =>
+      (row: InternalRow, ordinal: Int) =>
+        params.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+
+    case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
+
+    case dt: DataType =>
+      (row: InternalRow, ordinal: Int) =>
+        row.get(ordinal, dt).toString
   }
 
   override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
 
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
-    csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), records == 0L && params.headerFlag)
+    csvWriter.writeRow(rowToString(row), records == 0L && params.headerFlag)
     records += 1
     if (records % FLUSH_BATCH_SIZE == 0) {
       flush()

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index 66f1126..02d211d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
@@ -53,6 +54,14 @@ private[sql] class JSONOptions(
   private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
   val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
 
+  // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+  val dateFormat: FastDateFormat =
+    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+  val timestampFormat: FastDateFormat =
+    FastDateFormat.getInstance(
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
+
   // Parse mode flags
   if (!ParseModes.isValidMode(parseMode)) {
     logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 8b920ec..800d43f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -32,11 +32,17 @@ private[sql] object JacksonGenerator {
    * @param gen a JsonGenerator object
    * @param row The row to convert
    */
-  def apply(rowSchema: StructType, gen: JsonGenerator)(row: InternalRow): Unit = {
+  def apply(
+      rowSchema: StructType,
+      gen: JsonGenerator,
+      options: JSONOptions = new JSONOptions(Map.empty[String, String]))
+      (row: InternalRow): Unit = {
     def valWriter: (DataType, Any) => Unit = {
       case (_, null) | (NullType, _) => gen.writeNull()
       case (StringType, v) => gen.writeString(v.toString)
-      case (TimestampType, v: Long) => gen.writeString(DateTimeUtils.toJavaTimestamp(v).toString)
+      case (TimestampType, v: Long) =>
+        val timestampString = options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(v))
+        gen.writeString(timestampString)
       case (IntegerType, v: Int) => gen.writeNumber(v)
       case (ShortType, v: Short) => gen.writeNumber(v)
       case (FloatType, v: Float) => gen.writeNumber(v)
@@ -46,7 +52,9 @@ private[sql] object JacksonGenerator {
       case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
       case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
       case (BooleanType, v: Boolean) => gen.writeBoolean(v)
-      case (DateType, v: Int) => gen.writeString(DateTimeUtils.toJavaDate(v).toString)
+      case (DateType, v: Int) =>
+        val dateString = options.dateFormat.format(DateTimeUtils.toJavaDate(v))
+        gen.writeString(dateString)
       // For UDT values, they should be in the SQL type's corresponding value type.
       // We should not see values in the user-defined class at here.
       // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 733fcbf..a5417dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json
 import java.io.ByteArrayOutputStream
 
 import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
 
 import com.fasterxml.jackson.core._
 
@@ -56,28 +57,30 @@ object JacksonParser extends Logging {
   def convertRootField(
       factory: JsonFactory,
       parser: JsonParser,
-      schema: DataType): Any = {
+      schema: DataType,
+      configOptions: JSONOptions): Any = {
     import com.fasterxml.jackson.core.JsonToken._
     (parser.getCurrentToken, schema) match {
       case (START_ARRAY, st: StructType) =>
         // SPARK-3308: support reading top level JSON arrays and take every element
         // in such an array as a row
-        convertArray(factory, parser, st)
+        convertArray(factory, parser, st, configOptions)
 
       case (START_OBJECT, ArrayType(st, _)) =>
         // the business end of SPARK-3308:
         // when an object is found but an array is requested just wrap it in a list
-        convertField(factory, parser, st) :: Nil
+        convertField(factory, parser, st, configOptions) :: Nil
 
       case _ =>
-        convertField(factory, parser, schema)
+        convertField(factory, parser, schema, configOptions)
     }
   }
 
   private def convertField(
       factory: JsonFactory,
       parser: JsonParser,
-      schema: DataType): Any = {
+      schema: DataType,
+      configOptions: JSONOptions): Any = {
     import com.fasterxml.jackson.core.JsonToken._
     (parser.getCurrentToken, schema) match {
       case (null | VALUE_NULL, _) =>
@@ -85,7 +88,7 @@ object JacksonParser extends Logging {
 
       case (FIELD_NAME, _) =>
         parser.nextToken()
-        convertField(factory, parser, schema)
+        convertField(factory, parser, schema, configOptions)
 
       case (VALUE_STRING, StringType) =>
         UTF8String.fromString(parser.getText)
@@ -99,19 +102,29 @@ object JacksonParser extends Logging {
 
       case (VALUE_STRING, DateType) =>
         val stringValue = parser.getText
-        if (stringValue.contains("-")) {
-          // The format of this string will probably be "yyyy-mm-dd".
-          DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
-        } else {
-          // In Spark 1.5.0, we store the data as number of days since epoch in string.
-          // So, we just convert it to Int.
-          stringValue.toInt
-        }
+        // This one will lose microseconds parts.
+        // See https://issues.apache.org/jira/browse/SPARK-10681.x
+        Try(DateTimeUtils.millisToDays(configOptions.dateFormat.parse(parser.getText).getTime))
+          .getOrElse {
+            // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+            // compatibility.
+            Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime))
+              .getOrElse {
+                // In Spark 1.5.0, we store the data as number of days since epoch in string.
+                // So, we just convert it to Int.
+                stringValue.toInt
+              }
+          }
 
       case (VALUE_STRING, TimestampType) =>
         // This one will lose microseconds parts.
         // See https://issues.apache.org/jira/browse/SPARK-10681.
-        DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+        Try(configOptions.timestampFormat.parse(parser.getText).getTime * 1000L)
+          .getOrElse {
+            // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+            // compatibility.
+            DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+          }
 
       case (VALUE_NUMBER_INT, TimestampType) =>
         parser.getLongValue * 1000000L
@@ -179,16 +192,16 @@ object JacksonParser extends Logging {
         false
 
       case (START_OBJECT, st: StructType) =>
-        convertObject(factory, parser, st)
+        convertObject(factory, parser, st, configOptions)
 
       case (START_ARRAY, ArrayType(st, _)) =>
-        convertArray(factory, parser, st)
+        convertArray(factory, parser, st, configOptions)
 
       case (START_OBJECT, MapType(StringType, kt, _)) =>
-        convertMap(factory, parser, kt)
+        convertMap(factory, parser, kt, configOptions)
 
       case (_, udt: UserDefinedType[_]) =>
-        convertField(factory, parser, udt.sqlType)
+        convertField(factory, parser, udt.sqlType, configOptions)
 
       case (token, dataType) =>
         // We cannot parse this token based on the given data type. So, we throw a
@@ -207,12 +220,13 @@ object JacksonParser extends Logging {
   private def convertObject(
       factory: JsonFactory,
       parser: JsonParser,
-      schema: StructType): InternalRow = {
+      schema: StructType,
+      configOptions: JSONOptions): InternalRow = {
     val row = new GenericMutableRow(schema.length)
     while (nextUntil(parser, JsonToken.END_OBJECT)) {
       schema.getFieldIndex(parser.getCurrentName) match {
         case Some(index) =>
-          row.update(index, convertField(factory, parser, schema(index).dataType))
+          row.update(index, convertField(factory, parser, schema(index).dataType, configOptions))
 
         case None =>
           parser.skipChildren()
@@ -228,12 +242,13 @@ object JacksonParser extends Logging {
   private def convertMap(
       factory: JsonFactory,
       parser: JsonParser,
-      valueType: DataType): MapData = {
+      valueType: DataType,
+      configOptions: JSONOptions): MapData = {
     val keys = ArrayBuffer.empty[UTF8String]
     val values = ArrayBuffer.empty[Any]
     while (nextUntil(parser, JsonToken.END_OBJECT)) {
       keys += UTF8String.fromString(parser.getCurrentName)
-      values += convertField(factory, parser, valueType)
+      values += convertField(factory, parser, valueType, configOptions)
     }
     ArrayBasedMapData(keys.toArray, values.toArray)
   }
@@ -241,10 +256,11 @@ object JacksonParser extends Logging {
   private def convertArray(
       factory: JsonFactory,
       parser: JsonParser,
-      elementType: DataType): ArrayData = {
+      elementType: DataType,
+      configOptions: JSONOptions): ArrayData = {
     val values = ArrayBuffer.empty[Any]
     while (nextUntil(parser, JsonToken.END_ARRAY)) {
-      values += convertField(factory, parser, elementType)
+      values += convertField(factory, parser, elementType, configOptions)
     }
 
     new GenericArrayData(values.toArray)
@@ -285,7 +301,7 @@ object JacksonParser extends Logging {
           Utils.tryWithResource(factory.createParser(record)) { parser =>
             parser.nextToken()
 
-            convertRootField(factory, parser, schema) match {
+            convertRootField(factory, parser, schema, configOptions) match {
               case null => failedRecord(record)
               case row: InternalRow => row :: Nil
               case array: ArrayData =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index c58de3a..decbdda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -85,7 +85,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
           bucketId: Option[Int],
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new JsonOutputWriter(path, bucketId, dataSchema, context)
+        new JsonOutputWriter(path, parsedOptions, bucketId, dataSchema, context)
       }
     }
   }
@@ -155,6 +155,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
 private[json] class JsonOutputWriter(
     path: String,
+    options: JSONOptions,
     bucketId: Option[Int],
     dataSchema: StructType,
     context: TaskAttemptContext)
@@ -181,7 +182,7 @@ private[json] class JsonOutputWriter(
   override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
 
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
-    JacksonGenerator(dataSchema, gen)(row)
+    JacksonGenerator(dataSchema, gen, options)(row)
     gen.flush()
 
     result.set(writer.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 2e606b2..e0a19b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -186,6 +186,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * <li>`columnNameOfCorruptRecord` (default is the value specified in
    * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
    * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    *
    * @since 2.0.0
    */
@@ -228,10 +234,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * value.</li>
    * <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
    * value.</li>
-   * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
-   * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
-   * and timestamp type. By default, it is `null` which means trying to parse times and date by
-   * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index dbe3af4..5e00f66 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -60,9 +60,9 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("Timestamp field types are inferred correctly via custom data format") {
-    var options = new CSVOptions(Map("dateFormat" -> "yyyy-mm"))
+    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"))
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
-    options = new CSVOptions(Map("dateFormat" -> "yyyy"))
+    options = new CSVOptions(Map("timestampFormat" -> "yyyy"))
     assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 8cd76dd..f68d220 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -22,6 +22,7 @@ import java.nio.charset.UnsupportedCharsetException
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 
+import org.apache.commons.lang3.time.FastDateFormat
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
@@ -477,7 +478,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     val options = Map(
       "header" -> "true",
       "inferSchema" -> "true",
-      "dateFormat" -> "dd/MM/yyyy hh:mm")
+      "timestampFormat" -> "dd/MM/yyyy HH:mm")
     val results = spark.read
       .format("csv")
       .options(options)
@@ -485,7 +486,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       .select("date")
       .collect()
 
-    val dateFormat = new SimpleDateFormat("dd/MM/yyyy hh:mm")
+    val dateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm")
     val expected =
       Seq(Seq(new Timestamp(dateFormat.parse("26/08/2015 18:00").getTime)),
         Seq(new Timestamp(dateFormat.parse("27/10/2014 18:30").getTime)),
@@ -691,4 +692,155 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
 
     verifyCars(cars, withHeader = true, checkValues = false)
   }
+
+  test("Write timestamps correctly in ISO8601 format by default") {
+    withTempDir { dir =>
+      val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+      val timestamps = spark.read
+        .format("csv")
+        .option("inferSchema", "true")
+        .option("header", "true")
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      timestamps.write
+        .format("csv")
+        .option("header", "true")
+        .save(iso8601timestampsPath)
+
+      // This will load back the timestamps as string.
+      val iso8601Timestamps = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(iso8601timestampsPath)
+
+      val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
+      val expectedTimestamps = timestamps.collect().map { r =>
+        // This should be ISO8601 formatted string.
+        Row(iso8501.format(r.toSeq.head))
+      }
+
+      checkAnswer(iso8601Timestamps, expectedTimestamps)
+    }
+  }
+
+  test("Write dates correctly in ISO8601 format by default") {
+    withTempDir { dir =>
+      val customSchema = new StructType(Array(StructField("date", DateType, true)))
+      val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv"
+      val dates = spark.read
+        .format("csv")
+        .schema(customSchema)
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .option("dateFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      dates.write
+        .format("csv")
+        .option("header", "true")
+        .save(iso8601datesPath)
+
+      // This will load back the dates as string.
+      val iso8601dates = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(iso8601datesPath)
+
+      val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd")
+      val expectedDates = dates.collect().map { r =>
+        // This should be ISO8601 formatted string.
+        Row(iso8501.format(r.toSeq.head))
+      }
+
+      checkAnswer(iso8601dates, expectedDates)
+    }
+  }
+
+  test("Roundtrip in reading and writing timestamps") {
+    withTempDir { dir =>
+      val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+      val timestamps = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .load(testFile(datesFile))
+
+      timestamps.write
+        .format("csv")
+        .option("header", "true")
+        .save(iso8601timestampsPath)
+
+      val iso8601timestamps = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .load(iso8601timestampsPath)
+
+      checkAnswer(iso8601timestamps, timestamps)
+    }
+  }
+
+  test("Write dates correctly with dateFormat option") {
+    val customSchema = new StructType(Array(StructField("date", DateType, true)))
+    withTempDir { dir =>
+      // With dateFormat option.
+      val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv"
+      val datesWithFormat = spark.read
+        .format("csv")
+        .schema(customSchema)
+        .option("header", "true")
+        .option("dateFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      datesWithFormat.write
+        .format("csv")
+        .option("header", "true")
+        .option("dateFormat", "yyyy/MM/dd")
+        .save(datesWithFormatPath)
+
+      // This will load back the dates as string.
+      val stringDatesWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(datesWithFormatPath)
+      val expectedStringDatesWithFormat = Seq(
+        Row("2015/08/26"),
+        Row("2014/10/27"),
+        Row("2016/01/28"))
+
+      checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+    }
+  }
+
+  test("Write timestamps correctly with dateFormat option") {
+    withTempDir { dir =>
+      // With dateFormat option.
+      val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
+      val timestampsWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      timestampsWithFormat.write
+        .format("csv")
+        .option("header", "true")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .save(timestampsWithFormatPath)
+
+      // This will load back the timestamps as string.
+      val stringTimestampsWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(timestampsWithFormatPath)
+      val expectedStringTimestampsWithFormat = Seq(
+        Row("2015/08/26 18:00"),
+        Row("2014/10/27 18:30"),
+        Row("2016/01/28 20:00"))
+
+      checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
index 26b33b2..3ce643e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
@@ -96,13 +96,18 @@ class CSVTypeCastSuite extends SparkFunSuite {
     assert(CSVTypeCast.castTo("1.00", DoubleType) == 1.0)
     assert(CSVTypeCast.castTo("true", BooleanType) == true)
 
-    val options = CSVOptions("dateFormat", "dd/MM/yyyy hh:mm")
+    val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm")
     val customTimestamp = "31/01/2015 00:00"
-    val expectedTime = options.dateFormat.parse("31/01/2015 00:00").getTime
-    assert(CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, options) ==
-      expectedTime * 1000L)
-    assert(CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, options) ==
-      DateTimeUtils.millisToDays(expectedTime))
+    val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
+    val castedTimestamp =
+      CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, timestampsOptions)
+    assert(castedTimestamp == expectedTime * 1000L)
+
+    val customDate = "31/01/2015"
+    val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy")
+    val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
+    val castedDate = CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, dateOptions)
+    assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
 
     val timestamp = "2015-01-01 00:00:00"
     assert(CSVTypeCast.castTo(timestamp, TimestampType) ==

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0b0e64a..1ba5b81 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -64,9 +64,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
         generator.flush()
       }
 
+      val dummyOption = new JSONOptions(Map.empty[String, String])
       Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
         parser.nextToken()
-        JacksonParser.convertRootField(factory, parser, dataType)
+        JacksonParser.convertRootField(factory, parser, dataType, dummyOption)
       }
     }
 
@@ -99,15 +100,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
 
     val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
-    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
-        enforceCorrectType(ISO8601Time1, TimestampType))
-    checkTypePromotion(DateTimeUtils.millisToDays(3601000),
-      enforceCorrectType(ISO8601Time1, DateType))
     val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
+      enforceCorrectType(ISO8601Time1, TimestampType))
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
-        enforceCorrectType(ISO8601Time2, TimestampType))
-    checkTypePromotion(DateTimeUtils.millisToDays(10801000),
-      enforceCorrectType(ISO8601Time2, DateType))
+      enforceCorrectType(ISO8601Time2, TimestampType))
+
+    val ISO8601Date = "1970-01-01"
+    checkTypePromotion(DateTimeUtils.millisToDays(32400000),
+      enforceCorrectType(ISO8601Date, DateType))
   }
 
   test("Get compatible type") {
@@ -1662,4 +1663,61 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     assert(df.schema.size === 2)
     df.collect()
   }
+
+  test("Write dates correctly with dateFormat option") {
+    val customSchema = new StructType(Array(StructField("date", DateType, true)))
+    withTempDir { dir =>
+      // With dateFormat option.
+      val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.json"
+      val datesWithFormat = spark.read
+        .schema(customSchema)
+        .option("dateFormat", "dd/MM/yyyy HH:mm")
+        .json(datesRecords)
+
+      datesWithFormat.write
+        .format("json")
+        .option("dateFormat", "yyyy/MM/dd")
+        .save(datesWithFormatPath)
+
+      // This will load back the dates as string.
+      val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+      val stringDatesWithFormat = spark.read
+        .schema(stringSchema)
+        .json(datesWithFormatPath)
+      val expectedStringDatesWithFormat = Seq(
+        Row("2015/08/26"),
+        Row("2014/10/27"),
+        Row("2016/01/28"))
+
+      checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+    }
+  }
+
+  test("Write timestamps correctly with dateFormat option") {
+    val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
+    withTempDir { dir =>
+      // With dateFormat option.
+      val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+      val timestampsWithFormat = spark.read
+        .schema(customSchema)
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .json(datesRecords)
+      timestampsWithFormat.write
+        .format("json")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .save(timestampsWithFormatPath)
+
+      // This will load back the timestamps as string.
+      val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+      val stringTimestampsWithFormat = spark.read
+        .schema(stringSchema)
+        .json(timestampsWithFormatPath)
+      val expectedStringDatesWithFormat = Seq(
+        Row("2015/08/26 18:00"),
+        Row("2014/10/27 18:30"),
+        Row("2016/01/28 20:00"))
+
+      checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index f4a3336..d1d82fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -222,6 +222,12 @@ private[json] trait TestJsonData {
     spark.sparkContext.parallelize(
       s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
 
+  def datesRecords: RDD[String] =
+    spark.sparkContext.parallelize(
+      """{"date": "26/08/2015 18:00"}""" ::
+        """{"date": "27/10/2014 18:30"}""" ::
+        """{"date": "28/01/2016 20:00"}""" :: Nil)
+
   lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil)
 
   def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]())

http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
index d79edee..52486b1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -32,6 +32,10 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
   override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
     case _: NullType => false
     case _: BinaryType => false
+    // `TimestampType` is disabled because `DatatypeConverter.parseDateTime()`
+    // in `DateTimeUtils` parses the formatted string wrongly when the date is
+    // too early. (e.g. "1600-07-13T08:36:32.847").
+    case _: TimestampType => false
     case _: CalendarIntervalType => false
     case _ => true
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org