You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/03 09:46:25 UTC

[spark] branch master updated: [SPARK-27344][SQL][TEST] Support the LocalDate and Instant classes in Java Bean encoders

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bc6723  [SPARK-27344][SQL][TEST] Support the LocalDate and Instant classes in Java Bean encoders
1bc6723 is described below

commit 1bc672366d6818eaa86f00cfb5af74891fff33e6
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Wed Apr 3 17:45:59 2019 +0800

    [SPARK-27344][SQL][TEST] Support the LocalDate and Instant classes in Java Bean encoders
    
    ## What changes were proposed in this pull request?
    
    - Added new test for Java Bean encoder of the classes: `java.time.LocalDate` and `java.time.Instant`.
    - Updated comment for `Encoders.bean`
    - New Row getters: `getLocalDate` and `getInstant`
    - Extended `inferDataType` to infer types for `java.time.LocalDate` -> `DateType` and `java.time.Instant` -> `TimestampType`.
    
    ## How was this patch tested?
    
    By `JavaBeanDeserializationSuite`
    
    Closes #24273 from MaxGekk/bean-instant-localdate.
    
    Lead-authored-by: Maxim Gekk <ma...@gmail.com>
    Co-authored-by: Maxim Gekk <ma...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Encoders.scala |  2 +-
 .../src/main/scala/org/apache/spark/sql/Row.scala  | 14 ++++
 .../spark/sql/catalyst/JavaTypeInference.scala     |  2 +
 .../spark/sql/JavaBeanDeserializationSuite.java    | 96 ++++++++++++++++++++++
 4 files changed, 113 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index f54c692..055fbc4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -149,7 +149,7 @@ object Encoders {
    *  - boxed types: Boolean, Integer, Double, etc.
    *  - String
    *  - java.math.BigDecimal, java.math.BigInteger
-   *  - time related: java.sql.Date, java.sql.Timestamp
+   *  - time related: java.sql.Date, java.sql.Timestamp, java.time.LocalDate, java.time.Instant
    *  - collection types: only array and java.util.List currently, map support is in progress
    *  - nested java bean.
    *
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index 4f5af9a..f13edde 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -270,6 +270,13 @@ trait Row extends Serializable {
   def getDate(i: Int): java.sql.Date = getAs[java.sql.Date](i)
 
   /**
+   * Returns the value at position i of date type as java.time.LocalDate.
+   *
+   * @throws ClassCastException when data type does not match.
+   */
+  def getLocalDate(i: Int): java.time.LocalDate = getAs[java.time.LocalDate](i)
+
+  /**
    * Returns the value at position i of date type as java.sql.Timestamp.
    *
    * @throws ClassCastException when data type does not match.
@@ -277,6 +284,13 @@ trait Row extends Serializable {
   def getTimestamp(i: Int): java.sql.Timestamp = getAs[java.sql.Timestamp](i)
 
   /**
+   * Returns the value at position i of date type as java.time.Instant.
+   *
+   * @throws ClassCastException when data type does not match.
+   */
+  def getInstant(i: Int): java.time.Instant = getAs[java.time.Instant](i)
+
+  /**
    * Returns the value at position i of array type as a Scala Seq.
    *
    * @throws ClassCastException when data type does not match.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 3913213..c5be3ef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -102,7 +102,9 @@ object JavaTypeInference {
 
       case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType.SYSTEM_DEFAULT, true)
       case c: Class[_] if c == classOf[java.math.BigInteger] => (DecimalType.BigIntDecimal, true)
+      case c: Class[_] if c == classOf[java.time.LocalDate] => (DateType, true)
       case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
+      case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType, true)
       case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
 
       case _ if typeToken.isArray =>
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
index c5f3867..7bf0789 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
@@ -18,10 +18,15 @@
 package test.org.apache.spark.sql;
 
 import java.io.Serializable;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.*;
 
 import org.apache.spark.sql.*;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.catalyst.util.TimestampFormatter;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
 import org.junit.*;
@@ -509,4 +514,95 @@ public class JavaBeanDeserializationSuite implements Serializable {
       this.id = id;
     }
   }
+
+  @Test
+  public void testBeanWithLocalDateAndInstant() {
+    String originConf = spark.conf().get(SQLConf.DATETIME_JAVA8API_ENABLED().key());
+    try {
+      spark.conf().set(SQLConf.DATETIME_JAVA8API_ENABLED().key(), "true");
+      List<Row> inputRows = new ArrayList<>();
+      List<LocalDateInstantRecord> expectedRecords = new ArrayList<>();
+
+      for (long idx = 0 ; idx < 5 ; idx++) {
+        Row row = createLocalDateInstantRow(idx);
+        inputRows.add(row);
+        expectedRecords.add(createLocalDateInstantRecord(row));
+      }
+
+      Encoder<LocalDateInstantRecord> encoder = Encoders.bean(LocalDateInstantRecord.class);
+
+      StructType schema = new StructType()
+        .add("localDateField", DataTypes.DateType)
+        .add("instantField", DataTypes.TimestampType);
+
+      Dataset<Row> dataFrame = spark.createDataFrame(inputRows, schema);
+      Dataset<LocalDateInstantRecord> dataset = dataFrame.as(encoder);
+
+      List<LocalDateInstantRecord> records = dataset.collectAsList();
+
+      Assert.assertEquals(expectedRecords, records);
+    } finally {
+        spark.conf().set(SQLConf.DATETIME_JAVA8API_ENABLED().key(), originConf);
+    }
+  }
+
+  public static final class LocalDateInstantRecord {
+    private String localDateField;
+    private String instantField;
+
+    public LocalDateInstantRecord() { }
+
+    public String getLocalDateField() {
+      return localDateField;
+    }
+
+    public void setLocalDateField(String localDateField) {
+      this.localDateField = localDateField;
+    }
+
+    public String getInstantField() {
+      return instantField;
+    }
+
+    public void setInstantField(String instantField) {
+      this.instantField = instantField;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      LocalDateInstantRecord that = (LocalDateInstantRecord) o;
+      return Objects.equals(localDateField, that.localDateField) &&
+        Objects.equals(instantField, that.instantField);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(localDateField, instantField);
+    }
+
+    @Override
+    public String toString() {
+      return com.google.common.base.Objects.toStringHelper(this)
+        .add("localDateField", localDateField)
+        .add("instantField", instantField)
+        .toString();
+    }
+  }
+
+  private static Row createLocalDateInstantRow(Long index) {
+    Object[] values = new Object[] { LocalDate.ofEpochDay(42), Instant.ofEpochSecond(42) };
+    return new GenericRow(values);
+  }
+
+  private static LocalDateInstantRecord createLocalDateInstantRecord(Row recordRow) {
+    LocalDateInstantRecord record = new LocalDateInstantRecord();
+    record.setLocalDateField(String.valueOf(recordRow.getLocalDate(0)));
+    Instant instant = recordRow.getInstant(1);
+    TimestampFormatter formatter = TimestampFormatter.getFractionFormatter(
+      DateTimeUtils.getZoneId(SQLConf.get().sessionLocalTimeZone()));
+    record.setInstantField(formatter.format(DateTimeUtils.instantToMicros(instant)));
+    return record;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org