You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/10 07:35:10 UTC
[spark] branch master updated: [SPARK-31392][SQL] Support
CalendarInterval to be reflect to CalendarntervalType
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a454510 [SPARK-31392][SQL] Support CalendarInterval to be reflect to CalendarntervalType
a454510 is described below
commit a454510917badb22be53869420f76babac6fa38d
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Fri Apr 10 07:34:01 2020 +0000
[SPARK-31392][SQL] Support CalendarInterval to be reflect to CalendarntervalType
### What changes were proposed in this pull request?
Since 3.0.0, we make CalendarInterval public for input, it's better for it to be inferred to CalendarIntervalType.
In the PR, we add a rule for CalendarInterval to be mapped to CalendarIntervalType in ScalaRelection, then records(e.g case class, tuples ...) contains interval fields are able to convert to a Dataframe.
### Why are the changes needed?
CalendarInterval is public but can not be used as input for Datafame.
```scala
scala> import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.unsafe.types.CalendarInterval
scala> Seq((1, new CalendarInterval(1, 2, 3))).toDF("a", "b")
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.unsafe.types.CalendarInterval is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:735)
```
this should be supported as well as
```scala
scala> sql("select interval 2 month 1 day a")
res2: org.apache.spark.sql.DataFrame = [a: interval]
```
### Does this PR introduce any user-facing change?
Yes, records(e.g case class, tuples ...) contains interval fields are able to convert to a Dataframe
### How was this patch tested?
add uts
Closes #28165 from yaooqinn/SPARK-31392.
Authored-by: Kent Yao <ya...@hotmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/catalyst/ScalaReflection.scala | 2 ++
.../spark/sql/catalyst/ScalaReflectionSuite.scala | 17 +++++++++++------
.../sql/catalyst/encoders/ExpressionEncoderSuite.scala | 6 +++---
.../scala/org/apache/spark/sql/DataFrameSuite.scala | 6 ++++++
.../sql/execution/arrow/ArrowConvertersSuite.scala | 12 +++++-------
5 files changed, 27 insertions(+), 16 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index f587245..3694832 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -700,6 +700,8 @@ object ScalaReflection extends ScalaReflection {
Schema(TimestampType, nullable = true)
case t if isSubtype(t, localTypeOf[java.time.LocalDate]) => Schema(DateType, nullable = true)
case t if isSubtype(t, localTypeOf[java.sql.Date]) => Schema(DateType, nullable = true)
+ case t if isSubtype(t, localTypeOf[CalendarInterval]) =>
+ Schema(CalendarIntervalType, nullable = true)
case t if isSubtype(t, localTypeOf[BigDecimal]) =>
Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
case t if isSubtype(t, localTypeOf[java.math.BigDecimal]) =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index f9cd9c3..b981a50 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, Expression, If, SpecificInternalRow, UpCast}
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, NewInstance}
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
case class PrimitiveData(
intField: Int,
@@ -48,7 +49,8 @@ case class NullableData(
decimalField: java.math.BigDecimal,
dateField: Date,
timestampField: Timestamp,
- binaryField: Array[Byte])
+ binaryField: Array[Byte],
+ intervalField: CalendarInterval)
case class OptionalData(
intField: Option[Int],
@@ -58,7 +60,8 @@ case class OptionalData(
shortField: Option[Short],
byteField: Option[Byte],
booleanField: Option[Boolean],
- structField: Option[PrimitiveData])
+ structField: Option[PrimitiveData],
+ intervalField: Option[CalendarInterval])
case class ComplexData(
arrayField: Seq[Int],
@@ -200,7 +203,8 @@ class ScalaReflectionSuite extends SparkFunSuite {
StructField("decimalField", DecimalType.SYSTEM_DEFAULT, nullable = true),
StructField("dateField", DateType, nullable = true),
StructField("timestampField", TimestampType, nullable = true),
- StructField("binaryField", BinaryType, nullable = true))),
+ StructField("binaryField", BinaryType, nullable = true),
+ StructField("intervalField", CalendarIntervalType, nullable = true))),
nullable = true))
}
@@ -215,7 +219,8 @@ class ScalaReflectionSuite extends SparkFunSuite {
StructField("shortField", ShortType, nullable = true),
StructField("byteField", ByteType, nullable = true),
StructField("booleanField", BooleanType, nullable = true),
- StructField("structField", schemaFor[PrimitiveData].dataType, nullable = true))),
+ StructField("structField", schemaFor[PrimitiveData].dataType, nullable = true),
+ StructField("intervalField", CalendarIntervalType, nullable = true))),
nullable = true))
}
@@ -295,10 +300,10 @@ class ScalaReflectionSuite extends SparkFunSuite {
test("convert Option[Product] to catalyst") {
val primitiveData = PrimitiveData(1, 1, 1, 1, 1, 1, true)
val data = OptionalData(Some(2), Some(2), Some(2), Some(2), Some(2), Some(2), Some(true),
- Some(primitiveData))
+ Some(primitiveData), Some(new CalendarInterval(1, 2, 3)))
val dataType = schemaFor[OptionalData].dataType
val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true,
- InternalRow(1, 1, 1, 1, 1, 1, true))
+ InternalRow(1, 1, 1, 1, 1, 1, true), new CalendarInterval(1, 2, 3))
assert(CatalystTypeConverters.createToCatalystConverter(dataType)(data) === convertedData)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 66a1bbe..1036dc7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.ClosureCleaner
case class RepeatedStruct(s: Seq[PrimitiveData])
@@ -209,9 +209,9 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes
productTest(
OptionalData(Some(2), Some(2), Some(2), Some(2), Some(2), Some(2), Some(true),
- Some(PrimitiveData(1, 1, 1, 1, 1, 1, true))))
+ Some(PrimitiveData(1, 1, 1, 1, 1, 1, true)), Some(new CalendarInterval(1, 2, 3))))
- productTest(OptionalData(None, None, None, None, None, None, None, None))
+ productTest(OptionalData(None, None, None, None, None, None, None, None, None))
encodeDecodeTest(Seq(Some(1), None), "Option in array")
encodeDecodeTest(Map(1 -> Some(10L), 2 -> Some(20L), 3 -> None), "Option in map")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 1762bc6..f797290 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession}
import org.apache.spark.sql.test.SQLTestData.{DecimalData, NullStrings, TestData2}
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils
import org.apache.spark.util.random.XORShiftRandom
@@ -2352,6 +2353,11 @@ class DataFrameSuite extends QueryTest
}
assert(e.getMessage.contains("Table or view not found:"))
}
+
+ test("CalendarInterval reflection support") {
+ val df = Seq((1, new CalendarInterval(1, 2, 3))).toDF("a", "b")
+ checkAnswer(df.selectExpr("b"), Row(new CalendarInterval(1, 2, 3)))
+ }
}
case class GroupByKey(a: Int, b: Int)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index fdb23d5..1e6e594 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1208,15 +1208,13 @@ class ArrowConvertersSuite extends SharedSparkSession {
spark.conf.unset(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key)
}
- testQuietly("unsupported types") {
- def runUnsupported(block: => Unit): Unit = {
- val msg = intercept[UnsupportedOperationException] {
- block
- }
- assert(msg.getMessage.contains("is not supported"))
+ testQuietly("interval is unsupported for arrow") {
+ val e = intercept[SparkException] {
+ calenderIntervalData.toDF().toArrowBatchRdd.collect()
}
- runUnsupported { calenderIntervalData.toDF().toArrowBatchRdd.collect() }
+ assert(e.getCause.isInstanceOf[UnsupportedOperationException])
+ assert(e.getCause.getMessage.contains("Unsupported data type: interval"))
}
test("test Arrow Validator") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org