You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/06 09:39:06 UTC
[spark] branch branch-3.2 updated: [SPARK-35999][SQL] Make
from_csv/to_csv to handle day-time intervals properly
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 5f383f0 [SPARK-35999][SQL] Make from_csv/to_csv to handle day-time intervals properly
5f383f0 is described below
commit 5f383f0102af14f5cc4cc43744fc95437ccbf291
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Tue Jul 6 17:37:38 2021 +0800
[SPARK-35999][SQL] Make from_csv/to_csv to handle day-time intervals properly
### What changes were proposed in this pull request?
This PR fixes an issue that `from_csv/to_csv` doesn't handle day-time intervals properly.
`from_csv` throws exception if day-time interval types are given.
```
spark-sql> select from_csv("interval '1 2:3:4' day to second", "a interval day to second");
21/07/03 04:39:13 ERROR SparkSQLDriver: Failed in [select from_csv("interval '1 2:3:4' day to second", "a interval day to second")]
java.lang.Exception: Unsupported type: interval day to second
at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:775)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:224)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134)
```
Also, `to_csv` doesn't handle day-time interval types properly though any exception is thrown.
The result of `to_csv` for day-time interval types is not ANSI interval compliant form.
```
spark-sql> select to_csv(named_struct("a", interval '1 2:3:4' day to second));
93784000000
```
The result above should be `INTERVAL '1 02:03:04' DAY TO SECOND`.
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New tests.
Closes #33226 from sarutak/csv-dtinterval.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit def8bc5c9631be0bd3172522006c23634e6f2c31)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/csv/UnivocityGenerator.scala | 5 +++
.../spark/sql/catalyst/csv/UnivocityParser.scala | 5 +++
.../org/apache/spark/sql/CsvFunctionsSuite.scala | 47 +++++++++++++++++++++-
3 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index 5d70ccb..2abf7bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -66,6 +66,11 @@ class UnivocityGenerator(
IntervalUtils.toYearMonthIntervalString(
row.getInt(ordinal), IntervalStringStyles.ANSI_STYLE, start, end)
+ case DayTimeIntervalType(start, end) =>
+ (row: InternalRow, ordinal: Int) =>
+ IntervalUtils.toDayTimeIntervalString(
+ row.getLong(ordinal), IntervalStringStyles.ANSI_STYLE, start, end)
+
case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
case dt: DataType =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 3ec1ea0..16d1251 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -222,6 +222,11 @@ class UnivocityParser(
Cast(Literal(datum), ym).eval(EmptyRow)
}
+ case dt: DayTimeIntervalType => (d: String) =>
+ nullSafeDatum(d, name, nullable, options) { datum =>
+ Cast(Literal(datum), dt).eval(EmptyRow)
+ }
+
case udt: UserDefinedType[_] =>
makeConverter(name, udt.sqlType, nullable)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index c6afb25..711a4bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql
import java.text.SimpleDateFormat
-import java.time.Period
+import java.time.{Duration, Period}
import java.util.Locale
import scala.collection.JavaConverters._
@@ -28,6 +28,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.DayTimeIntervalType.{DAY, HOUR, MINUTE, SECOND}
import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR}
class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
@@ -308,4 +309,48 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
}
}
}
+
+ test("SPARK-35999: Make from_csv/to_csv to handle day-time intervals properly") {
+ val dtDF = Seq(Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)).toDF
+ Seq(
+ (DayTimeIntervalType(), "INTERVAL '1 02:03:04' DAY TO SECOND",
+ Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)),
+ (DayTimeIntervalType(DAY, MINUTE), "INTERVAL '1 02:03' DAY TO MINUTE",
+ Duration.ofDays(1).plusHours(2).plusMinutes(3)),
+ (DayTimeIntervalType(DAY, HOUR), "INTERVAL '1 02' DAY TO HOUR",
+ Duration.ofDays(1).plusHours(2)),
+ (DayTimeIntervalType(DAY), "INTERVAL '1' DAY",
+ Duration.ofDays(1)),
+ (DayTimeIntervalType(HOUR, SECOND), "INTERVAL '26:03:04' HOUR TO SECOND",
+ Duration.ofHours(26).plusMinutes(3).plusSeconds(4)),
+ (DayTimeIntervalType(HOUR, MINUTE), "INTERVAL '26:03' HOUR TO MINUTE",
+ Duration.ofHours(26).plusMinutes(3)),
+ (DayTimeIntervalType(HOUR), "INTERVAL '26' HOUR",
+ Duration.ofHours(26)),
+ (DayTimeIntervalType(MINUTE, SECOND), "INTERVAL '1563:04' MINUTE TO SECOND",
+ Duration.ofMinutes(1563).plusSeconds(4)),
+ (DayTimeIntervalType(MINUTE), "INTERVAL '1563' MINUTE",
+ Duration.ofMinutes(1563)),
+ (DayTimeIntervalType(SECOND), "INTERVAL '93784' SECOND",
+ Duration.ofSeconds(93784))
+ ).foreach { case (toCsvDtype, toCsvExpected, fromCsvExpected) =>
+ val toCsvDF = dtDF.select(to_csv(struct($"value" cast toCsvDtype)) as "csv")
+ checkAnswer(toCsvDF, Row(toCsvExpected))
+
+ DataTypeTestUtils.dayTimeIntervalTypes.foreach { fromCsvDtype =>
+ val fromCsvDF = toCsvDF
+ .select(
+ from_csv(
+ $"csv",
+ StructType(StructField("a", fromCsvDtype) :: Nil),
+ Map.empty[String, String]) as "value")
+ .selectExpr("value.a")
+ if (toCsvDtype == fromCsvDtype) {
+ checkAnswer(fromCsvDF, Row(fromCsvExpected))
+ } else {
+ checkAnswer(fromCsvDF, Row(null))
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org