You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/10 07:35:10 UTC

[spark] branch master updated: [SPARK-31392][SQL] Support CalendarInterval to be reflect to CalendarntervalType

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a454510  [SPARK-31392][SQL] Support CalendarInterval to be reflect to CalendarntervalType
a454510 is described below

commit a454510917badb22be53869420f76babac6fa38d
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Fri Apr 10 07:34:01 2020 +0000

    [SPARK-31392][SQL] Support CalendarInterval to be reflect to CalendarntervalType
    
    ### What changes were proposed in this pull request?
    
    Since 3.0.0, we make CalendarInterval public for input, it's better for it to be inferred to CalendarIntervalType.
    In the PR, we add a rule for CalendarInterval to be mapped to CalendarIntervalType in ScalaRelection, then records(e.g case class, tuples ...) contains interval fields are able to convert to a Dataframe.
    
    ### Why are the changes needed?
    
    CalendarInterval is public but can not be used as input for Datafame.
    
    ```scala
    scala> import org.apache.spark.unsafe.types.CalendarInterval
    import org.apache.spark.unsafe.types.CalendarInterval
    
    scala> Seq((1, new CalendarInterval(1, 2, 3))).toDF("a", "b")
    java.lang.UnsupportedOperationException: Schema for type org.apache.spark.unsafe.types.CalendarInterval is not supported
      at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:735)
    ```
    
    this should be supported as well as
    ```scala
    scala> sql("select interval 2 month 1 day a")
    res2: org.apache.spark.sql.DataFrame = [a: interval]
    ```
    ### Does this PR introduce any user-facing change?
    
    Yes, records(e.g case class, tuples ...) contains interval fields are able to convert to a Dataframe
    ### How was this patch tested?
    
    add uts
    
    Closes #28165 from yaooqinn/SPARK-31392.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/catalyst/ScalaReflection.scala |  2 ++
 .../spark/sql/catalyst/ScalaReflectionSuite.scala       | 17 +++++++++++------
 .../sql/catalyst/encoders/ExpressionEncoderSuite.scala  |  6 +++---
 .../scala/org/apache/spark/sql/DataFrameSuite.scala     |  6 ++++++
 .../sql/execution/arrow/ArrowConvertersSuite.scala      | 12 +++++-------
 5 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index f587245..3694832 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -700,6 +700,8 @@ object ScalaReflection extends ScalaReflection {
         Schema(TimestampType, nullable = true)
       case t if isSubtype(t, localTypeOf[java.time.LocalDate]) => Schema(DateType, nullable = true)
       case t if isSubtype(t, localTypeOf[java.sql.Date]) => Schema(DateType, nullable = true)
+      case t if isSubtype(t, localTypeOf[CalendarInterval]) =>
+        Schema(CalendarIntervalType, nullable = true)
       case t if isSubtype(t, localTypeOf[BigDecimal]) =>
         Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
       case t if isSubtype(t, localTypeOf[java.math.BigDecimal]) =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index f9cd9c3..b981a50 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
 import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, Expression, If, SpecificInternalRow, UpCast}
 import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, NewInstance}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
 
 case class PrimitiveData(
     intField: Int,
@@ -48,7 +49,8 @@ case class NullableData(
     decimalField: java.math.BigDecimal,
     dateField: Date,
     timestampField: Timestamp,
-    binaryField: Array[Byte])
+    binaryField: Array[Byte],
+    intervalField: CalendarInterval)
 
 case class OptionalData(
     intField: Option[Int],
@@ -58,7 +60,8 @@ case class OptionalData(
     shortField: Option[Short],
     byteField: Option[Byte],
     booleanField: Option[Boolean],
-    structField: Option[PrimitiveData])
+    structField: Option[PrimitiveData],
+    intervalField: Option[CalendarInterval])
 
 case class ComplexData(
     arrayField: Seq[Int],
@@ -200,7 +203,8 @@ class ScalaReflectionSuite extends SparkFunSuite {
         StructField("decimalField", DecimalType.SYSTEM_DEFAULT, nullable = true),
         StructField("dateField", DateType, nullable = true),
         StructField("timestampField", TimestampType, nullable = true),
-        StructField("binaryField", BinaryType, nullable = true))),
+        StructField("binaryField", BinaryType, nullable = true),
+        StructField("intervalField", CalendarIntervalType, nullable = true))),
       nullable = true))
   }
 
@@ -215,7 +219,8 @@ class ScalaReflectionSuite extends SparkFunSuite {
         StructField("shortField", ShortType, nullable = true),
         StructField("byteField", ByteType, nullable = true),
         StructField("booleanField", BooleanType, nullable = true),
-        StructField("structField", schemaFor[PrimitiveData].dataType, nullable = true))),
+        StructField("structField", schemaFor[PrimitiveData].dataType, nullable = true),
+        StructField("intervalField", CalendarIntervalType, nullable = true))),
       nullable = true))
   }
 
@@ -295,10 +300,10 @@ class ScalaReflectionSuite extends SparkFunSuite {
   test("convert Option[Product] to catalyst") {
     val primitiveData = PrimitiveData(1, 1, 1, 1, 1, 1, true)
     val data = OptionalData(Some(2), Some(2), Some(2), Some(2), Some(2), Some(2), Some(true),
-      Some(primitiveData))
+      Some(primitiveData), Some(new CalendarInterval(1, 2, 3)))
     val dataType = schemaFor[OptionalData].dataType
     val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true,
-      InternalRow(1, 1, 1, 1, 1, 1, true))
+      InternalRow(1, 1, 1, 1, 1, 1, true), new CalendarInterval(1, 2, 3))
     assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData)
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 66a1bbe..1036dc7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 import org.apache.spark.util.ClosureCleaner
 
 case class RepeatedStruct(s: Seq[PrimitiveData])
@@ -209,9 +209,9 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes
 
   productTest(
     OptionalData(Some(2), Some(2), Some(2), Some(2), Some(2), Some(2), Some(true),
-      Some(PrimitiveData(1, 1, 1, 1, 1, 1, true))))
+      Some(PrimitiveData(1, 1, 1, 1, 1, 1, true)), Some(new CalendarInterval(1, 2, 3))))
 
-  productTest(OptionalData(None, None, None, None, None, None, None, None))
+  productTest(OptionalData(None, None, None, None, None, None, None, None, None))
 
   encodeDecodeTest(Seq(Some(1), None), "Option in array")
   encodeDecodeTest(Map(1 -> Some(10L), 2 -> Some(20L), 3 -> None), "Option in map")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 1762bc6..f797290 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession}
 import org.apache.spark.sql.test.SQLTestData.{DecimalData, NullStrings, TestData2}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.Utils
 import org.apache.spark.util.random.XORShiftRandom
 
@@ -2352,6 +2353,11 @@ class DataFrameSuite extends QueryTest
     }
     assert(e.getMessage.contains("Table or view not found:"))
   }
+
+  test("CalendarInterval reflection support") {
+    val df = Seq((1, new CalendarInterval(1, 2, 3))).toDF("a", "b")
+    checkAnswer(df.selectExpr("b"), Row(new CalendarInterval(1, 2, 3)))
+  }
 }
 
 case class GroupByKey(a: Int, b: Int)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index fdb23d5..1e6e594 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1208,15 +1208,13 @@ class ArrowConvertersSuite extends SharedSparkSession {
     spark.conf.unset(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key)
   }
 
-  testQuietly("unsupported types") {
-    def runUnsupported(block: => Unit): Unit = {
-      val msg = intercept[UnsupportedOperationException] {
-        block
-      }
-      assert(msg.getMessage.contains("is not supported"))
+  testQuietly("interval is unsupported for arrow") {
+    val e = intercept[SparkException] {
+      calenderIntervalData.toDF().toArrowBatchRdd.collect()
     }
 
-    runUnsupported { calenderIntervalData.toDF().toArrowBatchRdd.collect() }
+    assert(e.getCause.isInstanceOf[UnsupportedOperationException])
+    assert(e.getCause.getMessage.contains("Unsupported data type: interval"))
   }
 
   test("test Arrow Validator") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org