You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/08/24 20:16:29 UTC

spark git commit: [SPARK-16216][SQL] Read/write timestamps and dates in ISO 8601 and dateFormat/timestampFormat option for CSV and JSON

Repository: spark
Updated Branches:
  refs/heads/master 891ac2b91 -> 29952ed09


[SPARK-16216][SQL] Read/write timestamps and dates in ISO 8601 and dateFormat/timestampFormat option for CSV and JSON

## What changes were proposed in this pull request?

### Default - ISO 8601

Currently, CSV datasource is writing `Timestamp` and `Date` as numeric form and JSON datasource is writing both as below:

- CSV
  ```
  // TimestampType
  1414459800000000
  // DateType
  16673
  ```

- Json

  ```
  // TimestampType
  1970-01-01 11:46:40.0
  // DateType
  1970-01-01
  ```

So, for CSV we can't read back what we write and for JSON it becomes ambiguous because the timezone is being missed.

So, this PR make both **write** `Timestamp` and `Date` in ISO 8601 formatted string (please refer the [ISO 8601 specification](https://www.w3.org/TR/NOTE-datetime)).

- For `Timestamp` it becomes as below: (`yyyy-MM-dd'T'HH:mm:ss.SSSZZ`)

  ```
  1970-01-01T02:00:01.000-01:00
  ```

- For `Date` it becomes as below (`yyyy-MM-dd`)

  ```
  1970-01-01
  ```

### Custom date format option - `dateFormat`

This PR also adds the support to write and read dates and timestamps in a formatted string as below:

- **DateType**

  - With `dateFormat` option (e.g. `yyyy/MM/dd`)

    ```
    +----------+
    |      date|
    +----------+
    |2015/08/26|
    |2014/10/27|
    |2016/01/28|
    +----------+
    ```

### Custom date format option - `timestampFormat`

- **TimestampType**

  - With `dateFormat` option (e.g. `dd/MM/yyyy HH:mm`)

    ```
    +----------------+
    |            date|
    +----------------+
    |2015/08/26 18:00|
    |2014/10/27 18:30|
    |2016/01/28 20:00|
    +----------------+
    ```

## How was this patch tested?

Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases.

Author: hyukjinkwon <gu...@gmail.com>

Closes #14279 from HyukjinKwon/SPARK-16216-json-csv.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29952ed0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29952ed0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29952ed0

Branch: refs/heads/master
Commit: 29952ed096fd2a0a19079933ff691671d6f00835
Parents: 891ac2b
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Aug 24 22:16:20 2016 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Wed Aug 24 22:16:20 2016 +0200

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  56 +++++--
 python/pyspark/sql/streaming.py                 |  30 +++-
 .../org/apache/spark/sql/DataFrameReader.scala  |  18 ++-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  12 ++
 .../datasources/csv/CSVInferSchema.scala        |  42 ++---
 .../execution/datasources/csv/CSVOptions.scala  |  15 +-
 .../execution/datasources/csv/CSVRelation.scala |  43 ++++-
 .../datasources/json/JSONOptions.scala          |   9 ++
 .../datasources/json/JacksonGenerator.scala     |  13 +-
 .../datasources/json/JacksonParser.scala        |  27 +++-
 .../datasources/json/JsonFileFormat.scala       |   5 +-
 .../spark/sql/streaming/DataStreamReader.scala  |  19 ++-
 .../datasources/csv/CSVInferSchemaSuite.scala   |   4 +-
 .../execution/datasources/csv/CSVSuite.scala    | 157 ++++++++++++++++++-
 .../datasources/csv/CSVTypeCastSuite.scala      |  17 +-
 .../execution/datasources/json/JsonSuite.scala  |  67 +++++++-
 .../datasources/json/TestJsonData.scala         |   6 +
 .../sql/sources/JsonHadoopFsRelationSuite.scala |   4 +
 18 files changed, 454 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 64de33e..3da6f49 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -156,7 +156,7 @@ class DataFrameReader(OptionUtils):
     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
-             mode=None, columnNameOfCorruptRecord=None):
+             mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
         """
         Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
         (one object per record) and returns the result as a :class`DataFrame`.
@@ -198,6 +198,14 @@ class DataFrameReader(OptionUtils):
                                           ``spark.sql.columnNameOfCorruptRecord``. If None is set,
                                           it uses the value specified in
                                           ``spark.sql.columnNameOfCorruptRecord``.
+        :param dateFormat: sets the string that indicates a date format. Custom date formats
+                           follow the formats at ``java.text.SimpleDateFormat``. This
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
 
         >>> df1 = spark.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
@@ -213,7 +221,8 @@ class DataFrameReader(OptionUtils):
             allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
-            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+            timestampFormat=timestampFormat)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
@@ -285,8 +294,8 @@ class DataFrameReader(OptionUtils):
     def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
-            negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
-            maxMalformedLogPerPartition=None, mode=None):
+            negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
         """Loads a CSV file and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -327,9 +336,12 @@ class DataFrameReader(OptionUtils):
                             is set, it uses the default value, ``Inf``.
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
-                           applies to both date type and timestamp type. By default, it is None
-                           which means trying to parse times and date by
-                           ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
         :param maxColumns: defines a hard limit of how many columns a record can have. If None is
                            set, it uses the default value, ``20480``.
         :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -356,7 +368,8 @@ class DataFrameReader(OptionUtils):
             header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
             ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
             nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
-            dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+            dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+            maxCharsPerColumn=maxCharsPerColumn,
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
         if isinstance(path, basestring):
             path = [path]
@@ -571,7 +584,7 @@ class DataFrameWriter(OptionUtils):
         self._jwrite.saveAsTable(name)
 
     @since(1.4)
-    def json(self, path, mode=None, compression=None):
+    def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
         """Saves the content of the :class:`DataFrame` in JSON format at the specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -584,11 +597,20 @@ class DataFrameWriter(OptionUtils):
         :param compression: compression codec to use when saving to file. This can be one of the
                             known case-insensitive shorten names (none, bzip2, gzip, lz4,
                             snappy and deflate).
+        :param dateFormat: sets the string that indicates a date format. Custom date formats
+                           follow the formats at ``java.text.SimpleDateFormat``. This
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
 
         >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
-        self._set_opts(compression=compression)
+        self._set_opts(
+            compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
         self._jwrite.json(path)
 
     @since(1.4)
@@ -634,7 +656,8 @@ class DataFrameWriter(OptionUtils):
 
     @since(2.0)
     def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
-            header=None, nullValue=None, escapeQuotes=None, quoteAll=None):
+            header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
+            timestampFormat=None):
         """Saves the content of the :class:`DataFrame` in CSV format at the specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -666,12 +689,21 @@ class DataFrameWriter(OptionUtils):
                        the default value, ``false``.
         :param nullValue: sets the string representation of a null value. If None is set, it uses
                           the default value, empty string.
+        :param dateFormat: sets the string that indicates a date format. Custom date formats
+                           follow the formats at ``java.text.SimpleDateFormat``. This
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
 
         >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
         self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
-                       nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll)
+                       nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
+                       dateFormat=dateFormat, timestampFormat=timestampFormat)
         self._jwrite.csv(path)
 
     @since(1.5)

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a364555..3761d2b 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -338,7 +338,8 @@ class DataStreamReader(OptionUtils):
     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
-             mode=None, columnNameOfCorruptRecord=None):
+             mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
+             timestampFormat=None):
         """
         Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
 
@@ -381,6 +382,14 @@ class DataStreamReader(OptionUtils):
                                           ``spark.sql.columnNameOfCorruptRecord``. If None is set,
                                           it uses the value specified in
                                           ``spark.sql.columnNameOfCorruptRecord``.
+        :param dateFormat: sets the string that indicates a date format. Custom date formats
+                           follow the formats at ``java.text.SimpleDateFormat``. This
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
 
         >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
         >>> json_sdf.isStreaming
@@ -393,7 +402,8 @@ class DataStreamReader(OptionUtils):
             allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
-            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+            timestampFormat=timestampFormat)
         if isinstance(path, basestring):
             return self._df(self._jreader.json(path))
         else:
@@ -450,8 +460,8 @@ class DataStreamReader(OptionUtils):
     def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
             comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
-            negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
-            maxMalformedLogPerPartition=None, mode=None):
+            negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
         """Loads a CSV file stream and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -494,9 +504,12 @@ class DataStreamReader(OptionUtils):
                             is set, it uses the default value, ``Inf``.
         :param dateFormat: sets the string that indicates a date format. Custom date formats
                            follow the formats at ``java.text.SimpleDateFormat``. This
-                           applies to both date type and timestamp type. By default, it is None
-                           which means trying to parse times and date by
-                           ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+                           applies to date type. If None is set, it uses the
+                           default value value, ``yyyy-MM-dd``.
+        :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+                                formats follow the formats at ``java.text.SimpleDateFormat``.
+                                This applies to timestamp type. If None is set, it uses the
+                                default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
         :param maxColumns: defines a hard limit of how many columns a record can have. If None is
                            set, it uses the default value, ``20480``.
         :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -521,7 +534,8 @@ class DataStreamReader(OptionUtils):
             header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
             ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
             nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
-            dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+            dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+            maxCharsPerColumn=maxCharsPerColumn,
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
         if isinstance(path, basestring):
             return self._df(self._jreader.csv(path))

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e23dacc..c060091 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -280,7 +280,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * <li>`columnNameOfCorruptRecord` (default is the value specified in
    * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
    * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    * </ul>
+   *
    * @since 2.0.0
    */
   @scala.annotation.varargs
@@ -374,10 +381,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * value.</li>
    * <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
    * value.</li>
-   * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
-   * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
-   * and timestamp type. By default, it is `null` which means trying to parse times and date by
-   * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.</li>
    * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 44a9f31..a9049a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -483,6 +483,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
    * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
    * `snappy` and `deflate`). </li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    *
    * @since 1.4.0
    */
@@ -575,6 +581,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
    * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
    * `snappy` and `deflate`). </li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    *
    * @since 2.0.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index de3d889..f1b4c11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -139,20 +139,14 @@ private[csv] object CSVInferSchema {
   }
 
   private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
-    if (options.dateFormat != null) {
-      // This case infers a custom `dataFormat` is set.
-      if ((allCatch opt options.dateFormat.parse(field)).isDefined) {
-        TimestampType
-      } else {
-        tryParseBoolean(field, options)
-      }
-    } else {
+    // This case infers a custom `dataFormat` is set.
+    if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
+      TimestampType
+    } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
       // We keep this for backwords competibility.
-      if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
-        TimestampType
-      } else {
-        tryParseBoolean(field, options)
-      }
+      TimestampType
+    } else {
+      tryParseBoolean(field, options)
     }
   }
 
@@ -277,18 +271,24 @@ private[csv] object CSVTypeCast {
           val value = new BigDecimal(datum.replaceAll(",", ""))
           Decimal(value, dt.precision, dt.scale)
         }
-      case _: TimestampType if options.dateFormat != null =>
-        // This one will lose microseconds parts.
-        // See https://issues.apache.org/jira/browse/SPARK-10681.
-        options.dateFormat.parse(datum).getTime * 1000L
       case _: TimestampType =>
         // This one will lose microseconds parts.
         // See https://issues.apache.org/jira/browse/SPARK-10681.
-        DateTimeUtils.stringToTime(datum).getTime  * 1000L
-      case _: DateType if options.dateFormat != null =>
-        DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)
+        Try(options.timestampFormat.parse(datum).getTime * 1000L)
+          .getOrElse {
+            // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+            // compatibility.
+            DateTimeUtils.stringToTime(datum).getTime  * 1000L
+          }
       case _: DateType =>
-        DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
+        // This one will lose microseconds parts.
+        // See https://issues.apache.org/jira/browse/SPARK-10681.x
+        Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
+          .getOrElse {
+            // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+            // compatibility.
+            DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
+          }
       case _: StringType => UTF8String.fromString(datum)
       case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 10fe541..364d7c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.nio.charset.StandardCharsets
-import java.text.SimpleDateFormat
+
+import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
@@ -101,11 +102,13 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str
     name.map(CompressionCodecs.getCodecClassName)
   }
 
-  // Share date format object as it is expensive to parse date pattern.
-  val dateFormat: SimpleDateFormat = {
-    val dateFormat = parameters.get("dateFormat")
-    dateFormat.map(new SimpleDateFormat(_)).orNull
-  }
+  // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+  val dateFormat: FastDateFormat =
+    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+  val timestampFormat: FastDateFormat =
+    FastDateFormat.getInstance(
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
 
   val maxColumns = getInt("maxColumns", 20480)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index de2d633..33b170b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
 import org.apache.spark.sql.types._
 
@@ -187,6 +188,14 @@ private[csv] class CsvOutputWriter(
   // create the Generator without separator inserted between 2 records
   private[this] val text = new Text()
 
+  // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`.
+  // When the value is null, this converter should not be called.
+  private type ValueConverter = (InternalRow, Int) => String
+
+  // `ValueConverter`s for all values in the fields of the schema
+  private val valueConverters: Array[ValueConverter] =
+    dataSchema.map(_.dataType).map(makeConverter).toArray
+
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
@@ -203,18 +212,40 @@ private[csv] class CsvOutputWriter(
   private var records: Long = 0L
   private val csvWriter = new LineCsvWriter(params, dataSchema.fieldNames.toSeq)
 
-  private def rowToString(row: Seq[Any]): Seq[String] = row.map { field =>
-    if (field != null) {
-      field.toString
-    } else {
-      params.nullValue
+  private def rowToString(row: InternalRow): Seq[String] = {
+    var i = 0
+    val values = new Array[String](row.numFields)
+    while (i < row.numFields) {
+      if (!row.isNullAt(i)) {
+        values(i) = valueConverters(i).apply(row, i)
+      } else {
+        values(i) = params.nullValue
+      }
+      i += 1
     }
+    values
+  }
+
+  private def makeConverter(dataType: DataType): ValueConverter = dataType match {
+    case DateType =>
+      (row: InternalRow, ordinal: Int) =>
+        params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+
+    case TimestampType =>
+      (row: InternalRow, ordinal: Int) =>
+        params.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+
+    case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
+
+    case dt: DataType =>
+      (row: InternalRow, ordinal: Int) =>
+        row.get(ordinal, dt).toString
   }
 
   override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
 
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
-    csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), records == 0L && params.headerFlag)
+    csvWriter.writeRow(rowToString(row), records == 0L && params.headerFlag)
     records += 1
     if (records % FLUSH_BATCH_SIZE == 0) {
       flush()

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index 66f1126..02d211d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
@@ -53,6 +54,14 @@ private[sql] class JSONOptions(
   private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
   val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
 
+  // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+  val dateFormat: FastDateFormat =
+    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+  val timestampFormat: FastDateFormat =
+    FastDateFormat.getInstance(
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
+
   // Parse mode flags
   if (!ParseModes.isValidMode(parseMode)) {
     logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 23f4a55..270e7fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -26,7 +26,10 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
 import org.apache.spark.sql.types._
 
-private[sql] class JacksonGenerator(schema: StructType, writer: Writer) {
+private[sql] class JacksonGenerator(
+    schema: StructType,
+    writer: Writer,
+    options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
   // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
   // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
   // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
@@ -76,11 +79,15 @@ private[sql] class JacksonGenerator(schema: StructType, writer: Writer) {
 
     case TimestampType =>
       (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeString(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)).toString)
+        val timestampString =
+          options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+        gen.writeString(timestampString)
 
     case DateType =>
       (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeString(DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString)
+        val dateString =
+          options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+        gen.writeString(dateString)
 
     case BinaryType =>
       (row: SpecializedGetters, ordinal: Int) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 4ae9376..359a3e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json
 import java.io.ByteArrayOutputStream
 
 import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
 
 import com.fasterxml.jackson.core._
 
@@ -204,7 +205,12 @@ class JacksonParser(
         case VALUE_STRING =>
           // This one will lose microseconds parts.
           // See https://issues.apache.org/jira/browse/SPARK-10681.
-          DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+          Try(options.timestampFormat.parse(parser.getText).getTime * 1000L)
+            .getOrElse {
+              // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+              // compatibility.
+              DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+            }
 
         case VALUE_NUMBER_INT =>
           parser.getLongValue * 1000000L
@@ -214,13 +220,18 @@ class JacksonParser(
       (parser: JsonParser) => parseJsonToken(parser, dataType) {
         case VALUE_STRING =>
           val stringValue = parser.getText
-          if (stringValue.contains("-")) {
-            // The format of this string will probably be "yyyy-mm-dd".
-            DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
-          } else {
-            // In Spark 1.5.0, we store the data as number of days since epoch in string.
-            // So, we just convert it to Int.
-            stringValue.toInt
+          // This one will lose microseconds parts.
+          // See https://issues.apache.org/jira/browse/SPARK-10681.x
+          Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime))
+            .getOrElse {
+            // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+            // compatibility.
+            Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime))
+              .getOrElse {
+              // In Spark 1.5.0, we store the data as number of days since epoch in string.
+              // So, we just convert it to Int.
+              stringValue.toInt
+            }
           }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 16150b9..7421314 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -83,7 +83,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
           bucketId: Option[Int],
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new JsonOutputWriter(path, bucketId, dataSchema, context)
+        new JsonOutputWriter(path, parsedOptions, bucketId, dataSchema, context)
       }
     }
   }
@@ -149,6 +149,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
 private[json] class JsonOutputWriter(
     path: String,
+    options: JSONOptions,
     bucketId: Option[Int],
     dataSchema: StructType,
     context: TaskAttemptContext)
@@ -156,7 +157,7 @@ private[json] class JsonOutputWriter(
 
   private[this] val writer = new CharArrayWriter()
   // create the Generator without separator inserted between 2 records
-  private[this] val gen = new JacksonGenerator(dataSchema, writer)
+  private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
   private[this] val result = new Text()
 
   private val recordWriter: RecordWriter[NullWritable, Text] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 2e606b2..3ad1125 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -186,6 +186,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * <li>`columnNameOfCorruptRecord` (default is the value specified in
    * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
    * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    *
    * @since 2.0.0
    */
@@ -228,10 +234,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * value.</li>
    * <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
    * value.</li>
-   * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
-   * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
-   * and timestamp type. By default, it is `null` which means trying to parse times and date by
-   * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
+   * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+   * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+   * date type.</li>
+   * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+   * indicates a timestamp format. Custom date formats follow the formats at
+   * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
@@ -258,7 +266,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
    * considered in every trigger.</li>
    * <li>`mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets
-   * whether we should merge schemas collected from all Parquet part-files. This will override
+   * whether we should merge schemas collected from all
+   * Parquet part-files. This will override
    * `spark.sql.parquet.mergeSchema`.</li>
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index dbe3af4..5e00f66 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -60,9 +60,9 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("Timestamp field types are inferred correctly via custom data format") {
-    var options = new CSVOptions(Map("dateFormat" -> "yyyy-mm"))
+    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"))
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
-    options = new CSVOptions(Map("dateFormat" -> "yyyy"))
+    options = new CSVOptions(Map("timestampFormat" -> "yyyy"))
     assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 8cd76dd..2befad6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -22,11 +22,13 @@ import java.nio.charset.UnsupportedCharsetException
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 
+import org.apache.commons.lang3.time.FastDateFormat
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.types._
 
@@ -477,7 +479,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     val options = Map(
       "header" -> "true",
       "inferSchema" -> "true",
-      "dateFormat" -> "dd/MM/yyyy hh:mm")
+      "timestampFormat" -> "dd/MM/yyyy HH:mm")
     val results = spark.read
       .format("csv")
       .options(options)
@@ -485,7 +487,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       .select("date")
       .collect()
 
-    val dateFormat = new SimpleDateFormat("dd/MM/yyyy hh:mm")
+    val dateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm")
     val expected =
       Seq(Seq(new Timestamp(dateFormat.parse("26/08/2015 18:00").getTime)),
         Seq(new Timestamp(dateFormat.parse("27/10/2014 18:30").getTime)),
@@ -691,4 +693,155 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
 
     verifyCars(cars, withHeader = true, checkValues = false)
   }
+
+  test("Write timestamps correctly in ISO8601 format by default") {
+    withTempDir { dir =>
+      val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+      val timestamps = spark.read
+        .format("csv")
+        .option("inferSchema", "true")
+        .option("header", "true")
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      timestamps.write
+        .format("csv")
+        .option("header", "true")
+        .save(iso8601timestampsPath)
+
+      // This will load back the timestamps as string.
+      val iso8601Timestamps = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(iso8601timestampsPath)
+
+      val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
+      val expectedTimestamps = timestamps.collect().map { r =>
+        // This should be ISO8601 formatted string.
+        Row(iso8501.format(r.toSeq.head))
+      }
+
+      checkAnswer(iso8601Timestamps, expectedTimestamps)
+    }
+  }
+
+  test("Write dates correctly in ISO8601 format by default") {
+    withTempDir { dir =>
+      val customSchema = new StructType(Array(StructField("date", DateType, true)))
+      val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv"
+      val dates = spark.read
+        .format("csv")
+        .schema(customSchema)
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .option("dateFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      dates.write
+        .format("csv")
+        .option("header", "true")
+        .save(iso8601datesPath)
+
+      // This will load back the dates as string.
+      val iso8601dates = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(iso8601datesPath)
+
+      val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd")
+      val expectedDates = dates.collect().map { r =>
+        // This should be ISO8601 formatted string.
+        Row(iso8501.format(r.toSeq.head))
+      }
+
+      checkAnswer(iso8601dates, expectedDates)
+    }
+  }
+
+  test("Roundtrip in reading and writing timestamps") {
+    withTempDir { dir =>
+      val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+      val timestamps = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .load(testFile(datesFile))
+
+      timestamps.write
+        .format("csv")
+        .option("header", "true")
+        .save(iso8601timestampsPath)
+
+      val iso8601timestamps = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .load(iso8601timestampsPath)
+
+      checkAnswer(iso8601timestamps, timestamps)
+    }
+  }
+
+  test("Write dates correctly with dateFormat option") {
+    val customSchema = new StructType(Array(StructField("date", DateType, true)))
+    withTempDir { dir =>
+      // With dateFormat option.
+      val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv"
+      val datesWithFormat = spark.read
+        .format("csv")
+        .schema(customSchema)
+        .option("header", "true")
+        .option("dateFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      datesWithFormat.write
+        .format("csv")
+        .option("header", "true")
+        .option("dateFormat", "yyyy/MM/dd")
+        .save(datesWithFormatPath)
+
+      // This will load back the dates as string.
+      val stringDatesWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(datesWithFormatPath)
+      val expectedStringDatesWithFormat = Seq(
+        Row("2015/08/26"),
+        Row("2014/10/27"),
+        Row("2016/01/28"))
+
+      checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+    }
+  }
+
+  test("Write timestamps correctly with dateFormat option") {
+    withTempDir { dir =>
+      // With dateFormat option.
+      val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
+      val timestampsWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      timestampsWithFormat.write
+        .format("csv")
+        .option("header", "true")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .save(timestampsWithFormatPath)
+
+      // This will load back the timestamps as string.
+      val stringTimestampsWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(timestampsWithFormatPath)
+      val expectedStringTimestampsWithFormat = Seq(
+        Row("2015/08/26 18:00"),
+        Row("2014/10/27 18:30"),
+        Row("2016/01/28 20:00"))
+
+      checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
index 26b33b2..3ce643e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
@@ -96,13 +96,18 @@ class CSVTypeCastSuite extends SparkFunSuite {
     assert(CSVTypeCast.castTo("1.00", DoubleType) == 1.0)
     assert(CSVTypeCast.castTo("true", BooleanType) == true)
 
-    val options = CSVOptions("dateFormat", "dd/MM/yyyy hh:mm")
+    val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm")
     val customTimestamp = "31/01/2015 00:00"
-    val expectedTime = options.dateFormat.parse("31/01/2015 00:00").getTime
-    assert(CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, options) ==
-      expectedTime * 1000L)
-    assert(CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, options) ==
-      DateTimeUtils.millisToDays(expectedTime))
+    val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
+    val castedTimestamp =
+      CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, timestampsOptions)
+    assert(castedTimestamp == expectedTime * 1000L)
+
+    val customDate = "31/01/2015"
+    val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy")
+    val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
+    val castedDate = CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, dateOptions)
+    assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
 
     val timestamp = "2015-01-01 00:00:00"
     assert(CSVTypeCast.castTo(timestamp, TimestampType) ==

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 342fd3e..63a9061 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -101,15 +101,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
 
     val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
+    val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
         enforceCorrectType(ISO8601Time1, TimestampType))
-    checkTypePromotion(DateTimeUtils.millisToDays(3601000),
-      enforceCorrectType(ISO8601Time1, DateType))
-    val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
         enforceCorrectType(ISO8601Time2, TimestampType))
-    checkTypePromotion(DateTimeUtils.millisToDays(10801000),
-      enforceCorrectType(ISO8601Time2, DateType))
+
+    val ISO8601Date = "1970-01-01"
+    checkTypePromotion(DateTimeUtils.millisToDays(32400000),
+      enforceCorrectType(ISO8601Date, DateType))
   }
 
   test("Get compatible type") {
@@ -1664,4 +1664,61 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     assert(df.schema.size === 2)
     df.collect()
   }
+
+  test("Write dates correctly with dateFormat option") {
+    val customSchema = new StructType(Array(StructField("date", DateType, true)))
+    withTempDir { dir =>
+      // With dateFormat option.
+      val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.json"
+      val datesWithFormat = spark.read
+        .schema(customSchema)
+        .option("dateFormat", "dd/MM/yyyy HH:mm")
+        .json(datesRecords)
+
+      datesWithFormat.write
+        .format("json")
+        .option("dateFormat", "yyyy/MM/dd")
+        .save(datesWithFormatPath)
+
+      // This will load back the dates as string.
+      val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+      val stringDatesWithFormat = spark.read
+        .schema(stringSchema)
+        .json(datesWithFormatPath)
+      val expectedStringDatesWithFormat = Seq(
+        Row("2015/08/26"),
+        Row("2014/10/27"),
+        Row("2016/01/28"))
+
+      checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+    }
+  }
+
+  test("Write timestamps correctly with dateFormat option") {
+    val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
+    withTempDir { dir =>
+      // With dateFormat option.
+      val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+      val timestampsWithFormat = spark.read
+        .schema(customSchema)
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .json(datesRecords)
+      timestampsWithFormat.write
+        .format("json")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .save(timestampsWithFormatPath)
+
+      // This will load back the timestamps as string.
+      val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+      val stringTimestampsWithFormat = spark.read
+        .schema(stringSchema)
+        .json(timestampsWithFormatPath)
+      val expectedStringDatesWithFormat = Seq(
+        Row("2015/08/26 18:00"),
+        Row("2014/10/27 18:30"),
+        Row("2016/01/28 20:00"))
+
+      checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index f4a3336..a400940 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -222,6 +222,12 @@ private[json] trait TestJsonData {
     spark.sparkContext.parallelize(
       s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
 
+  def datesRecords: RDD[String] =
+    spark.sparkContext.parallelize(
+      """{"date": "26/08/2015 18:00"}""" ::
+      """{"date": "27/10/2014 18:30"}""" ::
+      """{"date": "28/01/2016 20:00"}""" :: Nil)
+
   lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil)
 
   def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]())

http://git-wip-us.apache.org/repos/asf/spark/blob/29952ed0/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
index d79edee..52486b1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -32,6 +32,10 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
   override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
     case _: NullType => false
     case _: BinaryType => false
+    // `TimestampType` is disabled because `DatatypeConverter.parseDateTime()`
+    // in `DateTimeUtils` parses the formatted string wrongly when the date is
+    // too early. (e.g. "1600-07-13T08:36:32.847").
+    case _: TimestampType => false
     case _: CalendarIntervalType => false
     case _ => true
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org