You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/03 09:46:25 UTC
[spark] branch master updated: [SPARK-27344][SQL][TEST] Support the
LocalDate and Instant classes in Java Bean encoders
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1bc6723 [SPARK-27344][SQL][TEST] Support the LocalDate and Instant classes in Java Bean encoders
1bc6723 is described below
commit 1bc672366d6818eaa86f00cfb5af74891fff33e6
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Wed Apr 3 17:45:59 2019 +0800
[SPARK-27344][SQL][TEST] Support the LocalDate and Instant classes in Java Bean encoders
## What changes were proposed in this pull request?
- Added new test for Java Bean encoder of the classes: `java.time.LocalDate` and `java.time.Instant`.
- Updated comment for `Encoders.bean`
- New Row getters: `getLocalDate` and `getInstant`
- Extended `inferDataType` to infer types for `java.time.LocalDate` -> `DateType` and `java.time.Instant` -> `TimestampType`.
## How was this patch tested?
By `JavaBeanDeserializationSuite`
Closes #24273 from MaxGekk/bean-instant-localdate.
Lead-authored-by: Maxim Gekk <ma...@gmail.com>
Co-authored-by: Maxim Gekk <ma...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../main/scala/org/apache/spark/sql/Encoders.scala | 2 +-
.../src/main/scala/org/apache/spark/sql/Row.scala | 14 ++++
.../spark/sql/catalyst/JavaTypeInference.scala | 2 +
.../spark/sql/JavaBeanDeserializationSuite.java | 96 ++++++++++++++++++++++
4 files changed, 113 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index f54c692..055fbc4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -149,7 +149,7 @@ object Encoders {
* - boxed types: Boolean, Integer, Double, etc.
* - String
* - java.math.BigDecimal, java.math.BigInteger
- * - time related: java.sql.Date, java.sql.Timestamp
+ * - time related: java.sql.Date, java.sql.Timestamp, java.time.LocalDate, java.time.Instant
* - collection types: only array and java.util.List currently, map support is in progress
* - nested java bean.
*
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index 4f5af9a..f13edde 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -270,6 +270,13 @@ trait Row extends Serializable {
def getDate(i: Int): java.sql.Date = getAs[java.sql.Date](i)
/**
+ * Returns the value at position i of date type as java.time.LocalDate.
+ *
+ * @throws ClassCastException when data type does not match.
+ */
+ def getLocalDate(i: Int): java.time.LocalDate = getAs[java.time.LocalDate](i)
+
+ /**
* Returns the value at position i of date type as java.sql.Timestamp.
*
* @throws ClassCastException when data type does not match.
@@ -277,6 +284,13 @@ trait Row extends Serializable {
def getTimestamp(i: Int): java.sql.Timestamp = getAs[java.sql.Timestamp](i)
/**
+ * Returns the value at position i of date type as java.time.Instant.
+ *
+ * @throws ClassCastException when data type does not match.
+ */
+ def getInstant(i: Int): java.time.Instant = getAs[java.time.Instant](i)
+
+ /**
* Returns the value at position i of array type as a Scala Seq.
*
* @throws ClassCastException when data type does not match.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 3913213..c5be3ef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -102,7 +102,9 @@ object JavaTypeInference {
case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType.SYSTEM_DEFAULT, true)
case c: Class[_] if c == classOf[java.math.BigInteger] => (DecimalType.BigIntDecimal, true)
+ case c: Class[_] if c == classOf[java.time.LocalDate] => (DateType, true)
case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
+ case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType, true)
case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
case _ if typeToken.isArray =>
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
index c5f3867..7bf0789 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
@@ -18,10 +18,15 @@
package test.org.apache.spark.sql;
import java.io.Serializable;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.catalyst.util.TimestampFormatter;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.*;
@@ -509,4 +514,95 @@ public class JavaBeanDeserializationSuite implements Serializable {
this.id = id;
}
}
+
+ @Test
+ public void testBeanWithLocalDateAndInstant() {
+ String originConf = spark.conf().get(SQLConf.DATETIME_JAVA8API_ENABLED().key());
+ try {
+ spark.conf().set(SQLConf.DATETIME_JAVA8API_ENABLED().key(), "true");
+ List<Row> inputRows = new ArrayList<>();
+ List<LocalDateInstantRecord> expectedRecords = new ArrayList<>();
+
+ for (long idx = 0 ; idx < 5 ; idx++) {
+ Row row = createLocalDateInstantRow(idx);
+ inputRows.add(row);
+ expectedRecords.add(createLocalDateInstantRecord(row));
+ }
+
+ Encoder<LocalDateInstantRecord> encoder = Encoders.bean(LocalDateInstantRecord.class);
+
+ StructType schema = new StructType()
+ .add("localDateField", DataTypes.DateType)
+ .add("instantField", DataTypes.TimestampType);
+
+ Dataset<Row> dataFrame = spark.createDataFrame(inputRows, schema);
+ Dataset<LocalDateInstantRecord> dataset = dataFrame.as(encoder);
+
+ List<LocalDateInstantRecord> records = dataset.collectAsList();
+
+ Assert.assertEquals(expectedRecords, records);
+ } finally {
+ spark.conf().set(SQLConf.DATETIME_JAVA8API_ENABLED().key(), originConf);
+ }
+ }
+
+ public static final class LocalDateInstantRecord {
+ private String localDateField;
+ private String instantField;
+
+ public LocalDateInstantRecord() { }
+
+ public String getLocalDateField() {
+ return localDateField;
+ }
+
+ public void setLocalDateField(String localDateField) {
+ this.localDateField = localDateField;
+ }
+
+ public String getInstantField() {
+ return instantField;
+ }
+
+ public void setInstantField(String instantField) {
+ this.instantField = instantField;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ LocalDateInstantRecord that = (LocalDateInstantRecord) o;
+ return Objects.equals(localDateField, that.localDateField) &&
+ Objects.equals(instantField, that.instantField);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(localDateField, instantField);
+ }
+
+ @Override
+ public String toString() {
+ return com.google.common.base.Objects.toStringHelper(this)
+ .add("localDateField", localDateField)
+ .add("instantField", instantField)
+ .toString();
+ }
+ }
+
+ private static Row createLocalDateInstantRow(Long index) {
+ Object[] values = new Object[] { LocalDate.ofEpochDay(42), Instant.ofEpochSecond(42) };
+ return new GenericRow(values);
+ }
+
+ private static LocalDateInstantRecord createLocalDateInstantRecord(Row recordRow) {
+ LocalDateInstantRecord record = new LocalDateInstantRecord();
+ record.setLocalDateField(String.valueOf(recordRow.getLocalDate(0)));
+ Instant instant = recordRow.getInstant(1);
+ TimestampFormatter formatter = TimestampFormatter.getFractionFormatter(
+ DateTimeUtils.getZoneId(SQLConf.get().sessionLocalTimeZone()));
+ record.setInstantField(formatter.format(DateTimeUtils.instantToMicros(instant)));
+ return record;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org