You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/08/25 04:19:39 UTC
spark git commit: [SPARK-16216][SQL][BRANCH-2.0] Backport Read/write
dateFormat/timestampFormat options for CSV and JSON
Repository: spark
Updated Branches:
refs/heads/branch-2.0 9f363a690 -> 3258f27a8
[SPARK-16216][SQL][BRANCH-2.0] Backport Read/write dateFormat/timestampFormat options for CSV and JSON
## What changes were proposed in this pull request?
This PR backports https://github.com/apache/spark/pull/14279 to 2.0.
## How was this patch tested?
Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases.
Author: hyukjinkwon <gu...@gmail.com>
Closes #14799 from HyukjinKwon/SPARK-16216-json-csv-backport.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3258f27a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3258f27a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3258f27a
Branch: refs/heads/branch-2.0
Commit: 3258f27a881dfeb5ab8bae90c338603fa4b6f9d8
Parents: 9f363a6
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Aug 24 21:19:35 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Aug 24 21:19:35 2016 -0700
----------------------------------------------------------------------
python/pyspark/sql/readwriter.py | 56 +++++--
python/pyspark/sql/streaming.py | 30 +++-
.../org/apache/spark/sql/DataFrameReader.scala | 17 +-
.../org/apache/spark/sql/DataFrameWriter.scala | 12 ++
.../datasources/csv/CSVInferSchema.scala | 42 ++---
.../execution/datasources/csv/CSVOptions.scala | 15 +-
.../execution/datasources/csv/CSVRelation.scala | 43 ++++-
.../datasources/json/JSONOptions.scala | 9 ++
.../datasources/json/JacksonGenerator.scala | 14 +-
.../datasources/json/JacksonParser.scala | 68 ++++----
.../datasources/json/JsonFileFormat.scala | 5 +-
.../spark/sql/streaming/DataStreamReader.scala | 16 +-
.../datasources/csv/CSVInferSchemaSuite.scala | 4 +-
.../execution/datasources/csv/CSVSuite.scala | 156 ++++++++++++++++++-
.../datasources/csv/CSVTypeCastSuite.scala | 17 +-
.../execution/datasources/json/JsonSuite.scala | 74 ++++++++-
.../datasources/json/TestJsonData.scala | 6 +
.../sql/sources/JsonHadoopFsRelationSuite.scala | 4 +
18 files changed, 478 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 64de33e..3da6f49 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -156,7 +156,7 @@ class DataFrameReader(OptionUtils):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
"""
Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
(one object per record) and returns the result as a :class`DataFrame`.
@@ -198,6 +198,14 @@ class DataFrameReader(OptionUtils):
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -213,7 +221,8 @@ class DataFrameReader(OptionUtils):
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
- mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+ timestampFormat=timestampFormat)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -285,8 +294,8 @@ class DataFrameReader(OptionUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
- maxMalformedLogPerPartition=None, mode=None):
+ negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -327,9 +336,12 @@ class DataFrameReader(OptionUtils):
is set, it uses the default value, ``Inf``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
- applies to both date type and timestamp type. By default, it is None
- which means trying to parse times and date by
- ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -356,7 +368,8 @@ class DataFrameReader(OptionUtils):
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
- dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+ maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
path = [path]
@@ -571,7 +584,7 @@ class DataFrameWriter(OptionUtils):
self._jwrite.saveAsTable(name)
@since(1.4)
- def json(self, path, mode=None, compression=None):
+ def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -584,11 +597,20 @@ class DataFrameWriter(OptionUtils):
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
- self._set_opts(compression=compression)
+ self._set_opts(
+ compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.json(path)
@since(1.4)
@@ -634,7 +656,8 @@ class DataFrameWriter(OptionUtils):
@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
- header=None, nullValue=None, escapeQuotes=None, quoteAll=None):
+ header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
+ timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -666,12 +689,21 @@ class DataFrameWriter(OptionUtils):
the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
- nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll)
+ nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
+ dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.csv(path)
@since(1.5)
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a364555..3761d2b 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -338,7 +338,8 @@ class DataStreamReader(OptionUtils):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
+ timestampFormat=None):
"""
Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
@@ -381,6 +382,14 @@ class DataStreamReader(OptionUtils):
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
@@ -393,7 +402,8 @@ class DataStreamReader(OptionUtils):
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
- mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+ timestampFormat=timestampFormat)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
@@ -450,8 +460,8 @@ class DataStreamReader(OptionUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
- maxMalformedLogPerPartition=None, mode=None):
+ negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -494,9 +504,12 @@ class DataStreamReader(OptionUtils):
is set, it uses the default value, ``Inf``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
- applies to both date type and timestamp type. By default, it is None
- which means trying to parse times and date by
- ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -521,7 +534,8 @@ class DataStreamReader(OptionUtils):
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
- dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+ maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e8c2885..083c2e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -280,6 +280,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`columnNameOfCorruptRecord` (default is the value specified in
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* </ul>
* @since 2.0.0
*/
@@ -376,10 +382,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* value.</li>
* <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
* value.</li>
- * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
- * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
- * and timestamp type. By default, it is `null` which means trying to parse times and date by
- * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.</li>
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 12b3046..767af99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -452,6 +452,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
*
* @since 1.4.0
*/
@@ -544,6 +550,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
*
* @since 2.0.0
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index de3d889..f1b4c11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -139,20 +139,14 @@ private[csv] object CSVInferSchema {
}
private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
- if (options.dateFormat != null) {
- // This case infers a custom `dataFormat` is set.
- if ((allCatch opt options.dateFormat.parse(field)).isDefined) {
- TimestampType
- } else {
- tryParseBoolean(field, options)
- }
- } else {
+ // This case infers a custom `dataFormat` is set.
+ if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
+ TimestampType
+ } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
// We keep this for backwords competibility.
- if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
- TimestampType
- } else {
- tryParseBoolean(field, options)
- }
+ TimestampType
+ } else {
+ tryParseBoolean(field, options)
}
}
@@ -277,18 +271,24 @@ private[csv] object CSVTypeCast {
val value = new BigDecimal(datum.replaceAll(",", ""))
Decimal(value, dt.precision, dt.scale)
}
- case _: TimestampType if options.dateFormat != null =>
- // This one will lose microseconds parts.
- // See https://issues.apache.org/jira/browse/SPARK-10681.
- options.dateFormat.parse(datum).getTime * 1000L
case _: TimestampType =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
- DateTimeUtils.stringToTime(datum).getTime * 1000L
- case _: DateType if options.dateFormat != null =>
- DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)
+ Try(options.timestampFormat.parse(datum).getTime * 1000L)
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ DateTimeUtils.stringToTime(datum).getTime * 1000L
+ }
case _: DateType =>
- DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
+ // This one will lose microseconds parts.
+ // See https://issues.apache.org/jira/browse/SPARK-10681.x
+ Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
+ }
case _: StringType => UTF8String.fromString(datum)
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 10fe541..364d7c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.StandardCharsets
-import java.text.SimpleDateFormat
+
+import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
@@ -101,11 +102,13 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str
name.map(CompressionCodecs.getCodecClassName)
}
- // Share date format object as it is expensive to parse date pattern.
- val dateFormat: SimpleDateFormat = {
- val dateFormat = parameters.get("dateFormat")
- dateFormat.map(new SimpleDateFormat(_)).orNull
- }
+ // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+ val dateFormat: FastDateFormat =
+ FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+ val timestampFormat: FastDateFormat =
+ FastDateFormat.getInstance(
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
val maxColumns = getInt("maxColumns", 20480)
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 10d84f4..d0d5ce0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.types._
@@ -179,6 +180,14 @@ private[csv] class CsvOutputWriter(
// create the Generator without separator inserted between 2 records
private[this] val text = new Text()
+ // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`.
+ // When the value is null, this converter should not be called.
+ private type ValueConverter = (InternalRow, Int) => String
+
+ // `ValueConverter`s for all values in the fields of the schema
+ private val valueConverters: Array[ValueConverter] =
+ dataSchema.map(_.dataType).map(makeConverter).toArray
+
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
@@ -195,18 +204,40 @@ private[csv] class CsvOutputWriter(
private var records: Long = 0L
private val csvWriter = new LineCsvWriter(params, dataSchema.fieldNames.toSeq)
- private def rowToString(row: Seq[Any]): Seq[String] = row.map { field =>
- if (field != null) {
- field.toString
- } else {
- params.nullValue
+ private def rowToString(row: InternalRow): Seq[String] = {
+ var i = 0
+ val values = new Array[String](row.numFields)
+ while (i < row.numFields) {
+ if (!row.isNullAt(i)) {
+ values(i) = valueConverters(i).apply(row, i)
+ } else {
+ values(i) = params.nullValue
+ }
+ i += 1
}
+ values
+ }
+
+ private def makeConverter(dataType: DataType): ValueConverter = dataType match {
+ case DateType =>
+ (row: InternalRow, ordinal: Int) =>
+ params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+
+ case TimestampType =>
+ (row: InternalRow, ordinal: Int) =>
+ params.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+
+ case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
+
+ case dt: DataType =>
+ (row: InternalRow, ordinal: Int) =>
+ row.get(ordinal, dt).toString
}
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = {
- csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), records == 0L && params.headerFlag)
+ csvWriter.writeRow(rowToString(row), records == 0L && params.headerFlag)
records += 1
if (records % FLUSH_BATCH_SIZE == 0) {
flush()
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index 66f1126..02d211d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.json
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
@@ -53,6 +54,14 @@ private[sql] class JSONOptions(
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
+ // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+ val dateFormat: FastDateFormat =
+ FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+ val timestampFormat: FastDateFormat =
+ FastDateFormat.getInstance(
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
+
// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 8b920ec..800d43f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -32,11 +32,17 @@ private[sql] object JacksonGenerator {
* @param gen a JsonGenerator object
* @param row The row to convert
*/
- def apply(rowSchema: StructType, gen: JsonGenerator)(row: InternalRow): Unit = {
+ def apply(
+ rowSchema: StructType,
+ gen: JsonGenerator,
+ options: JSONOptions = new JSONOptions(Map.empty[String, String]))
+ (row: InternalRow): Unit = {
def valWriter: (DataType, Any) => Unit = {
case (_, null) | (NullType, _) => gen.writeNull()
case (StringType, v) => gen.writeString(v.toString)
- case (TimestampType, v: Long) => gen.writeString(DateTimeUtils.toJavaTimestamp(v).toString)
+ case (TimestampType, v: Long) =>
+ val timestampString = options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(v))
+ gen.writeString(timestampString)
case (IntegerType, v: Int) => gen.writeNumber(v)
case (ShortType, v: Short) => gen.writeNumber(v)
case (FloatType, v: Float) => gen.writeNumber(v)
@@ -46,7 +52,9 @@ private[sql] object JacksonGenerator {
case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
case (BooleanType, v: Boolean) => gen.writeBoolean(v)
- case (DateType, v: Int) => gen.writeString(DateTimeUtils.toJavaDate(v).toString)
+ case (DateType, v: Int) =>
+ val dateString = options.dateFormat.format(DateTimeUtils.toJavaDate(v))
+ gen.writeString(dateString)
// For UDT values, they should be in the SQL type's corresponding value type.
// We should not see values in the user-defined class at here.
// For example, VectorUDT's SQL type is an array of double. So, we should expect that v is
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 733fcbf..a5417dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.ByteArrayOutputStream
import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
import com.fasterxml.jackson.core._
@@ -56,28 +57,30 @@ object JacksonParser extends Logging {
def convertRootField(
factory: JsonFactory,
parser: JsonParser,
- schema: DataType): Any = {
+ schema: DataType,
+ configOptions: JSONOptions): Any = {
import com.fasterxml.jackson.core.JsonToken._
(parser.getCurrentToken, schema) match {
case (START_ARRAY, st: StructType) =>
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
- convertArray(factory, parser, st)
+ convertArray(factory, parser, st, configOptions)
case (START_OBJECT, ArrayType(st, _)) =>
// the business end of SPARK-3308:
// when an object is found but an array is requested just wrap it in a list
- convertField(factory, parser, st) :: Nil
+ convertField(factory, parser, st, configOptions) :: Nil
case _ =>
- convertField(factory, parser, schema)
+ convertField(factory, parser, schema, configOptions)
}
}
private def convertField(
factory: JsonFactory,
parser: JsonParser,
- schema: DataType): Any = {
+ schema: DataType,
+ configOptions: JSONOptions): Any = {
import com.fasterxml.jackson.core.JsonToken._
(parser.getCurrentToken, schema) match {
case (null | VALUE_NULL, _) =>
@@ -85,7 +88,7 @@ object JacksonParser extends Logging {
case (FIELD_NAME, _) =>
parser.nextToken()
- convertField(factory, parser, schema)
+ convertField(factory, parser, schema, configOptions)
case (VALUE_STRING, StringType) =>
UTF8String.fromString(parser.getText)
@@ -99,19 +102,29 @@ object JacksonParser extends Logging {
case (VALUE_STRING, DateType) =>
val stringValue = parser.getText
- if (stringValue.contains("-")) {
- // The format of this string will probably be "yyyy-mm-dd".
- DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
- } else {
- // In Spark 1.5.0, we store the data as number of days since epoch in string.
- // So, we just convert it to Int.
- stringValue.toInt
- }
+ // This one will lose microseconds parts.
+ // See https://issues.apache.org/jira/browse/SPARK-10681.x
+ Try(DateTimeUtils.millisToDays(configOptions.dateFormat.parse(parser.getText).getTime))
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime))
+ .getOrElse {
+ // In Spark 1.5.0, we store the data as number of days since epoch in string.
+ // So, we just convert it to Int.
+ stringValue.toInt
+ }
+ }
case (VALUE_STRING, TimestampType) =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
- DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+ Try(configOptions.timestampFormat.parse(parser.getText).getTime * 1000L)
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+ }
case (VALUE_NUMBER_INT, TimestampType) =>
parser.getLongValue * 1000000L
@@ -179,16 +192,16 @@ object JacksonParser extends Logging {
false
case (START_OBJECT, st: StructType) =>
- convertObject(factory, parser, st)
+ convertObject(factory, parser, st, configOptions)
case (START_ARRAY, ArrayType(st, _)) =>
- convertArray(factory, parser, st)
+ convertArray(factory, parser, st, configOptions)
case (START_OBJECT, MapType(StringType, kt, _)) =>
- convertMap(factory, parser, kt)
+ convertMap(factory, parser, kt, configOptions)
case (_, udt: UserDefinedType[_]) =>
- convertField(factory, parser, udt.sqlType)
+ convertField(factory, parser, udt.sqlType, configOptions)
case (token, dataType) =>
// We cannot parse this token based on the given data type. So, we throw a
@@ -207,12 +220,13 @@ object JacksonParser extends Logging {
private def convertObject(
factory: JsonFactory,
parser: JsonParser,
- schema: StructType): InternalRow = {
+ schema: StructType,
+ configOptions: JSONOptions): InternalRow = {
val row = new GenericMutableRow(schema.length)
while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
- row.update(index, convertField(factory, parser, schema(index).dataType))
+ row.update(index, convertField(factory, parser, schema(index).dataType, configOptions))
case None =>
parser.skipChildren()
@@ -228,12 +242,13 @@ object JacksonParser extends Logging {
private def convertMap(
factory: JsonFactory,
parser: JsonParser,
- valueType: DataType): MapData = {
+ valueType: DataType,
+ configOptions: JSONOptions): MapData = {
val keys = ArrayBuffer.empty[UTF8String]
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_OBJECT)) {
keys += UTF8String.fromString(parser.getCurrentName)
- values += convertField(factory, parser, valueType)
+ values += convertField(factory, parser, valueType, configOptions)
}
ArrayBasedMapData(keys.toArray, values.toArray)
}
@@ -241,10 +256,11 @@ object JacksonParser extends Logging {
private def convertArray(
factory: JsonFactory,
parser: JsonParser,
- elementType: DataType): ArrayData = {
+ elementType: DataType,
+ configOptions: JSONOptions): ArrayData = {
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_ARRAY)) {
- values += convertField(factory, parser, elementType)
+ values += convertField(factory, parser, elementType, configOptions)
}
new GenericArrayData(values.toArray)
@@ -285,7 +301,7 @@ object JacksonParser extends Logging {
Utils.tryWithResource(factory.createParser(record)) { parser =>
parser.nextToken()
- convertRootField(factory, parser, schema) match {
+ convertRootField(factory, parser, schema, configOptions) match {
case null => failedRecord(record)
case row: InternalRow => row :: Nil
case array: ArrayData =>
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index c58de3a..decbdda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -85,7 +85,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new JsonOutputWriter(path, bucketId, dataSchema, context)
+ new JsonOutputWriter(path, parsedOptions, bucketId, dataSchema, context)
}
}
}
@@ -155,6 +155,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
private[json] class JsonOutputWriter(
path: String,
+ options: JSONOptions,
bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext)
@@ -181,7 +182,7 @@ private[json] class JsonOutputWriter(
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = {
- JacksonGenerator(dataSchema, gen)(row)
+ JacksonGenerator(dataSchema, gen, options)(row)
gen.flush()
result.set(writer.toString)
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 2e606b2..e0a19b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -186,6 +186,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`columnNameOfCorruptRecord` (default is the value specified in
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
*
* @since 2.0.0
*/
@@ -228,10 +234,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* value.</li>
* <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
* value.</li>
- * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
- * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
- * and timestamp type. By default, it is `null` which means trying to parse times and date by
- * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index dbe3af4..5e00f66 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -60,9 +60,9 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("Timestamp field types are inferred correctly via custom data format") {
- var options = new CSVOptions(Map("dateFormat" -> "yyyy-mm"))
+ var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"))
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
- options = new CSVOptions(Map("dateFormat" -> "yyyy"))
+ options = new CSVOptions(Map("timestampFormat" -> "yyyy"))
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 8cd76dd..f68d220 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -22,6 +22,7 @@ import java.nio.charset.UnsupportedCharsetException
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
+import org.apache.commons.lang3.time.FastDateFormat
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
@@ -477,7 +478,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val options = Map(
"header" -> "true",
"inferSchema" -> "true",
- "dateFormat" -> "dd/MM/yyyy hh:mm")
+ "timestampFormat" -> "dd/MM/yyyy HH:mm")
val results = spark.read
.format("csv")
.options(options)
@@ -485,7 +486,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.select("date")
.collect()
- val dateFormat = new SimpleDateFormat("dd/MM/yyyy hh:mm")
+ val dateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm")
val expected =
Seq(Seq(new Timestamp(dateFormat.parse("26/08/2015 18:00").getTime)),
Seq(new Timestamp(dateFormat.parse("27/10/2014 18:30").getTime)),
@@ -691,4 +692,155 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
verifyCars(cars, withHeader = true, checkValues = false)
}
+
+ test("Write timestamps correctly in ISO8601 format by default") {
+ withTempDir { dir =>
+ val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+ val timestamps = spark.read
+ .format("csv")
+ .option("inferSchema", "true")
+ .option("header", "true")
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ timestamps.write
+ .format("csv")
+ .option("header", "true")
+ .save(iso8601timestampsPath)
+
+ // This will load back the timestamps as string.
+ val iso8601Timestamps = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(iso8601timestampsPath)
+
+ val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
+ val expectedTimestamps = timestamps.collect().map { r =>
+ // This should be ISO8601 formatted string.
+ Row(iso8501.format(r.toSeq.head))
+ }
+
+ checkAnswer(iso8601Timestamps, expectedTimestamps)
+ }
+ }
+
+ test("Write dates correctly in ISO8601 format by default") {
+ withTempDir { dir =>
+ val customSchema = new StructType(Array(StructField("date", DateType, true)))
+ val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv"
+ val dates = spark.read
+ .format("csv")
+ .schema(customSchema)
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .option("dateFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ dates.write
+ .format("csv")
+ .option("header", "true")
+ .save(iso8601datesPath)
+
+ // This will load back the dates as string.
+ val iso8601dates = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(iso8601datesPath)
+
+ val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd")
+ val expectedDates = dates.collect().map { r =>
+ // This should be ISO8601 formatted string.
+ Row(iso8501.format(r.toSeq.head))
+ }
+
+ checkAnswer(iso8601dates, expectedDates)
+ }
+ }
+
+ test("Roundtrip in reading and writing timestamps") {
+ withTempDir { dir =>
+ val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+ val timestamps = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .load(testFile(datesFile))
+
+ timestamps.write
+ .format("csv")
+ .option("header", "true")
+ .save(iso8601timestampsPath)
+
+ val iso8601timestamps = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .load(iso8601timestampsPath)
+
+ checkAnswer(iso8601timestamps, timestamps)
+ }
+ }
+
+ test("Write dates correctly with dateFormat option") {
+ val customSchema = new StructType(Array(StructField("date", DateType, true)))
+ withTempDir { dir =>
+ // With dateFormat option.
+ val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv"
+ val datesWithFormat = spark.read
+ .format("csv")
+ .schema(customSchema)
+ .option("header", "true")
+ .option("dateFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ datesWithFormat.write
+ .format("csv")
+ .option("header", "true")
+ .option("dateFormat", "yyyy/MM/dd")
+ .save(datesWithFormatPath)
+
+ // This will load back the dates as string.
+ val stringDatesWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(datesWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/26"),
+ Row("2014/10/27"),
+ Row("2016/01/28"))
+
+ checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+ }
+ }
+
+ test("Write timestamps correctly with dateFormat option") {
+ withTempDir { dir =>
+ // With dateFormat option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
+ val timestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ timestampsWithFormat.write
+ .format("csv")
+ .option("header", "true")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringTimestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(timestampsWithFormatPath)
+ val expectedStringTimestampsWithFormat = Seq(
+ Row("2015/08/26 18:00"),
+ Row("2014/10/27 18:30"),
+ Row("2016/01/28 20:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
index 26b33b2..3ce643e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
@@ -96,13 +96,18 @@ class CSVTypeCastSuite extends SparkFunSuite {
assert(CSVTypeCast.castTo("1.00", DoubleType) == 1.0)
assert(CSVTypeCast.castTo("true", BooleanType) == true)
- val options = CSVOptions("dateFormat", "dd/MM/yyyy hh:mm")
+ val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm")
val customTimestamp = "31/01/2015 00:00"
- val expectedTime = options.dateFormat.parse("31/01/2015 00:00").getTime
- assert(CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, options) ==
- expectedTime * 1000L)
- assert(CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, options) ==
- DateTimeUtils.millisToDays(expectedTime))
+ val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
+ val castedTimestamp =
+ CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, timestampsOptions)
+ assert(castedTimestamp == expectedTime * 1000L)
+
+ val customDate = "31/01/2015"
+ val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy")
+ val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
+ val castedDate = CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, dateOptions)
+ assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
val timestamp = "2015-01-01 00:00:00"
assert(CSVTypeCast.castTo(timestamp, TimestampType) ==
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0b0e64a..1ba5b81 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -64,9 +64,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
generator.flush()
}
+ val dummyOption = new JSONOptions(Map.empty[String, String])
Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
parser.nextToken()
- JacksonParser.convertRootField(factory, parser, dataType)
+ JacksonParser.convertRootField(factory, parser, dataType, dummyOption)
}
}
@@ -99,15 +100,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
- checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
- enforceCorrectType(ISO8601Time1, TimestampType))
- checkTypePromotion(DateTimeUtils.millisToDays(3601000),
- enforceCorrectType(ISO8601Time1, DateType))
val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
+ checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
+ enforceCorrectType(ISO8601Time1, TimestampType))
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
- enforceCorrectType(ISO8601Time2, TimestampType))
- checkTypePromotion(DateTimeUtils.millisToDays(10801000),
- enforceCorrectType(ISO8601Time2, DateType))
+ enforceCorrectType(ISO8601Time2, TimestampType))
+
+ val ISO8601Date = "1970-01-01"
+ checkTypePromotion(DateTimeUtils.millisToDays(32400000),
+ enforceCorrectType(ISO8601Date, DateType))
}
test("Get compatible type") {
@@ -1662,4 +1663,61 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(df.schema.size === 2)
df.collect()
}
+
+ test("Write dates correctly with dateFormat option") {
+ val customSchema = new StructType(Array(StructField("date", DateType, true)))
+ withTempDir { dir =>
+ // With dateFormat option.
+ val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.json"
+ val datesWithFormat = spark.read
+ .schema(customSchema)
+ .option("dateFormat", "dd/MM/yyyy HH:mm")
+ .json(datesRecords)
+
+ datesWithFormat.write
+ .format("json")
+ .option("dateFormat", "yyyy/MM/dd")
+ .save(datesWithFormatPath)
+
+ // This will load back the dates as string.
+ val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+ val stringDatesWithFormat = spark.read
+ .schema(stringSchema)
+ .json(datesWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/26"),
+ Row("2014/10/27"),
+ Row("2016/01/28"))
+
+ checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+ }
+ }
+
+ test("Write timestamps correctly with dateFormat option") {
+ val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
+ withTempDir { dir =>
+ // With dateFormat option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+ val timestampsWithFormat = spark.read
+ .schema(customSchema)
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .json(datesRecords)
+ timestampsWithFormat.write
+ .format("json")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+ val stringTimestampsWithFormat = spark.read
+ .schema(stringSchema)
+ .json(timestampsWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/26 18:00"),
+ Row("2014/10/27 18:30"),
+ Row("2016/01/28 20:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index f4a3336..d1d82fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -222,6 +222,12 @@ private[json] trait TestJsonData {
spark.sparkContext.parallelize(
s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
+ def datesRecords: RDD[String] =
+ spark.sparkContext.parallelize(
+ """{"date": "26/08/2015 18:00"}""" ::
+ """{"date": "27/10/2014 18:30"}""" ::
+ """{"date": "28/01/2016 20:00"}""" :: Nil)
+
lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]())
http://git-wip-us.apache.org/repos/asf/spark/blob/3258f27a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
index d79edee..52486b1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -32,6 +32,10 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: NullType => false
case _: BinaryType => false
+ // `TimestampType` is disabled because `DatatypeConverter.parseDateTime()`
+ // in `DateTimeUtils` parses the formatted string wrongly when the date is
+ // too early. (e.g. "1600-07-13T08:36:32.847").
+ case _: TimestampType => false
case _: CalendarIntervalType => false
case _ => true
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org