You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/01 15:03:11 UTC

[spark] branch master updated: [SPARK-27325][SQL] Add implicit encoders for LocalDate and Instant

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d332958  [SPARK-27325][SQL] Add implicit encoders for LocalDate and Instant
d332958 is described below

commit d332958109d7dc20fef9367fe645108f72a820d4
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Mon Apr 1 23:02:48 2019 +0800

    [SPARK-27325][SQL] Add implicit encoders for LocalDate and Instant
    
    ## What changes were proposed in this pull request?
    
    Added implicit encoders for the `java.time.LocalDate` and `java.time.Instant` classes. This allows creation of datasets from instances of the types.
    
    ## How was this patch tested?
    
    Added new tests to `JavaDatasetSuite` and `DatasetSuite`.
    
    Closes #24249 from MaxGekk/instant-localdate-encoders.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../src/main/scala/org/apache/spark/sql/Encoders.scala   | 16 ++++++++++++++++
 .../main/scala/org/apache/spark/sql/SQLImplicits.scala   |  5 +++++
 .../java/test/org/apache/spark/sql/JavaDatasetSuite.java | 12 ++++++++++++
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala   |  8 ++++++++
 4 files changed, 41 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index 42b865c..f54c692 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -110,6 +110,14 @@ object Encoders {
   def DATE: Encoder[java.sql.Date] = ExpressionEncoder()
 
   /**
+   * Creates an encoder that serializes instances of the `java.time.LocalDate` class
+   * to the internal representation of nullable Catalyst's DateType.
+   *
+   * @since 3.0.0
+   */
+  def LOCALDATE: Encoder[java.time.LocalDate] = ExpressionEncoder()
+
+  /**
    * An encoder for nullable timestamp type.
    *
    * @since 1.6.0
@@ -117,6 +125,14 @@ object Encoders {
   def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()
 
   /**
+   * Creates an encoder that serializes instances of the `java.time.Instant` class
+   * to the internal representation of nullable Catalyst's TimestampType.
+   *
+   * @since 3.0.0
+   */
+  def INSTANT: Encoder[java.time.Instant] = ExpressionEncoder()
+
+  /**
    * An encoder for arrays of bytes.
    *
    * @since 1.6.1
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index d329af0..c997b7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -81,9 +81,14 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
   /** @since 2.2.0 */
   implicit def newDateEncoder: Encoder[java.sql.Date] = Encoders.DATE
 
+  /** @since 3.0.0 */
+  implicit def newLocalDateEncoder: Encoder[java.time.LocalDate] = Encoders.LOCALDATE
+
   /** @since 2.2.0 */
   implicit def newTimeStampEncoder: Encoder[java.sql.Timestamp] = Encoders.TIMESTAMP
 
+  /** @since 3.0.0 */
+  implicit def newInstantEncoder: Encoder[java.time.Instant] = Encoders.INSTANT
 
   // Boxed primitives
 
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 2c695fc..1e5f55e 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -21,6 +21,8 @@ import java.io.Serializable;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.*;
 
 import org.apache.spark.sql.streaming.GroupStateTimeout;
@@ -399,6 +401,16 @@ public class JavaDatasetSuite implements Serializable {
     Assert.assertEquals(data, ds.collectAsList());
   }
 
+  @Test
+  public void testLocalDateAndInstantEncoders() {
+    Encoder<Tuple2<LocalDate, Instant>> encoder =
+      Encoders.tuple(Encoders.LOCALDATE(), Encoders.INSTANT());
+    List<Tuple2<LocalDate, Instant>> data =
+      Arrays.asList(new Tuple2<>(LocalDate.ofEpochDay(0), Instant.ofEpochSecond(0)));
+    Dataset<Tuple2<LocalDate, Instant>> ds = spark.createDataset(data, encoder);
+    Assert.assertEquals(data, ds.collectAsList());
+  }
+
   public static class KryoSerializable {
     String value;
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 6e35b52..d52047d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1730,6 +1730,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     def assertExecutionId: UserDefinedFunction = udf(AssertExecutionId.apply _)
     spark.range(10).select(assertExecutionId($"id")).localCheckpoint(true)
   }
+
+  test("implicit encoder for LocalDate and Instant") {
+    val localDate = java.time.LocalDate.of(2019, 3, 30)
+    assert(spark.range(1).map { _ => localDate }.head === localDate)
+
+    val instant = java.time.Instant.parse("2019-03-30T09:54:00Z")
+    assert(spark.range(1).map { _ => instant }.head === instant)
+  }
 }
 
 object AssertExecutionId {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org