You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/06 09:39:06 UTC

[spark] branch branch-3.2 updated: [SPARK-35999][SQL] Make from_csv/to_csv to handle day-time intervals properly

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 5f383f0  [SPARK-35999][SQL] Make from_csv/to_csv to handle day-time intervals properly
5f383f0 is described below

commit 5f383f0102af14f5cc4cc43744fc95437ccbf291
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Tue Jul 6 17:37:38 2021 +0800

    [SPARK-35999][SQL] Make from_csv/to_csv to handle day-time intervals properly
    
    ### What changes were proposed in this pull request?
    
    This PR fixes an issue that `from_csv/to_csv` doesn't handle day-time intervals properly.
    `from_csv` throws exception if day-time interval types are given.
    ```
    spark-sql> select from_csv("interval '1 2:3:4' day to second", "a interval day to second");
    21/07/03 04:39:13 ERROR SparkSQLDriver: Failed in [select from_csv("interval '1 2:3:4' day to second", "a interval day to second")]
    java.lang.Exception: Unsupported type: interval day to second
     at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:775)
     at org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:224)
     at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134)
    ```
    
    Also, `to_csv` doesn't handle day-time interval types properly though any exception is thrown.
    The result of `to_csv` for day-time interval types is not ANSI interval compliant form.
    
    ```
    spark-sql> select to_csv(named_struct("a", interval '1 2:3:4' day to second));
    93784000000
    ```
    The result above should be `INTERVAL '1 02:03:04' DAY TO SECOND`.
    
    ### Why are the changes needed?
    
    Bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New tests.
    
    Closes #33226 from sarutak/csv-dtinterval.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit def8bc5c9631be0bd3172522006c23634e6f2c31)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/csv/UnivocityGenerator.scala      |  5 +++
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  5 +++
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   | 47 +++++++++++++++++++++-
 3 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index 5d70ccb..2abf7bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -66,6 +66,11 @@ class UnivocityGenerator(
         IntervalUtils.toYearMonthIntervalString(
           row.getInt(ordinal), IntervalStringStyles.ANSI_STYLE, start, end)
 
+    case DayTimeIntervalType(start, end) =>
+      (row: InternalRow, ordinal: Int) =>
+      IntervalUtils.toDayTimeIntervalString(
+        row.getLong(ordinal), IntervalStringStyles.ANSI_STYLE, start, end)
+
     case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
 
     case dt: DataType =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 3ec1ea0..16d1251 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -222,6 +222,11 @@ class UnivocityParser(
         Cast(Literal(datum), ym).eval(EmptyRow)
       }
 
+    case dt: DayTimeIntervalType => (d: String) =>
+      nullSafeDatum(d, name, nullable, options) { datum =>
+        Cast(Literal(datum), dt).eval(EmptyRow)
+      }
+
     case udt: UserDefinedType[_] =>
       makeConverter(name, udt.sqlType, nullable)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index c6afb25..711a4bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import java.text.SimpleDateFormat
-import java.time.Period
+import java.time.{Duration, Period}
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -28,6 +28,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.DayTimeIntervalType.{DAY, HOUR, MINUTE, SECOND}
 import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR}
 
 class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
@@ -308,4 +309,48 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-35999: Make from_csv/to_csv to handle day-time intervals properly") {
+    val dtDF = Seq(Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)).toDF
+    Seq(
+      (DayTimeIntervalType(), "INTERVAL '1 02:03:04' DAY TO SECOND",
+        Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)),
+      (DayTimeIntervalType(DAY, MINUTE), "INTERVAL '1 02:03' DAY TO MINUTE",
+        Duration.ofDays(1).plusHours(2).plusMinutes(3)),
+      (DayTimeIntervalType(DAY, HOUR), "INTERVAL '1 02' DAY TO HOUR",
+        Duration.ofDays(1).plusHours(2)),
+      (DayTimeIntervalType(DAY), "INTERVAL '1' DAY",
+        Duration.ofDays(1)),
+      (DayTimeIntervalType(HOUR, SECOND), "INTERVAL '26:03:04' HOUR TO SECOND",
+        Duration.ofHours(26).plusMinutes(3).plusSeconds(4)),
+      (DayTimeIntervalType(HOUR, MINUTE), "INTERVAL '26:03' HOUR TO MINUTE",
+        Duration.ofHours(26).plusMinutes(3)),
+      (DayTimeIntervalType(HOUR), "INTERVAL '26' HOUR",
+        Duration.ofHours(26)),
+      (DayTimeIntervalType(MINUTE, SECOND), "INTERVAL '1563:04' MINUTE TO SECOND",
+        Duration.ofMinutes(1563).plusSeconds(4)),
+      (DayTimeIntervalType(MINUTE), "INTERVAL '1563' MINUTE",
+        Duration.ofMinutes(1563)),
+      (DayTimeIntervalType(SECOND), "INTERVAL '93784' SECOND",
+        Duration.ofSeconds(93784))
+    ).foreach { case (toCsvDtype, toCsvExpected, fromCsvExpected) =>
+      val toCsvDF = dtDF.select(to_csv(struct($"value" cast toCsvDtype)) as "csv")
+      checkAnswer(toCsvDF, Row(toCsvExpected))
+
+      DataTypeTestUtils.dayTimeIntervalTypes.foreach { fromCsvDtype =>
+        val fromCsvDF = toCsvDF
+          .select(
+            from_csv(
+              $"csv",
+              StructType(StructField("a", fromCsvDtype) :: Nil),
+              Map.empty[String, String]) as "value")
+          .selectExpr("value.a")
+        if (toCsvDtype == fromCsvDtype) {
+          checkAnswer(fromCsvDF, Row(fromCsvExpected))
+        } else {
+          checkAnswer(fromCsvDF, Row(null))
+        }
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org