You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2022/07/01 00:16:57 UTC

[spark] branch master updated: [SPARK-39633][SQL] Support timestamp in seconds for TimeTravel using Dataframe options

This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 44e2657f3d5 [SPARK-39633][SQL] Support timestamp in seconds for TimeTravel using Dataframe options
44e2657f3d5 is described below

commit 44e2657f3d511c25135c95dc3d584c540d227b5b
Author: Prashant Singh <ps...@amazon.com>
AuthorDate: Thu Jun 30 17:16:32 2022 -0700

    [SPARK-39633][SQL] Support timestamp in seconds for TimeTravel using Dataframe options
    
    ### What changes were proposed in this pull request?
    
    Support timestamp in seconds for TimeTravel using Dataframe options
    
    ### Why are the changes needed?
    
    To have a parity in doing TimeTravel via SQL and Dataframe option.
    
    SPARK-SQL supports queries like :
    ```sql
    SELECT * from {table} TIMESTAMP AS OF 1548751078
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added new UTs for testing the behaviour.
    
    Closes #37025 from singhpk234/fix/timetravel_df_options.
    
    Authored-by: Prashant Singh <ps...@amazon.com>
    Signed-off-by: huaxingao <hu...@apple.com>
---
 .../sql/execution/datasources/v2/DataSourceV2Utils.scala     | 12 ++++++++++--
 .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala    | 11 +++++++++++
 .../spark/sql/connector/SupportsCatalogOptionsSuite.scala    |  7 +++++++
 3 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index f69a2a45886..7fd61c44fd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSuppo
 import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{LongType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 private[sql] object DataSourceV2Utils extends Logging {
@@ -124,7 +124,15 @@ private[sql] object DataSourceV2Utils extends Logging {
         val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions)
 
         val timeTravelVersion = if (version.isPresent) Some(version.get) else None
-        val timeTravelTimestamp = if (timestamp.isPresent) Some(Literal(timestamp.get)) else None
+        val timeTravelTimestamp = if (timestamp.isPresent) {
+          if (timestamp.get.forall(_.isDigit)) {
+            Some(Literal(timestamp.get.toLong, LongType))
+          } else {
+            Some(Literal(timestamp.get))
+          }
+        } else {
+          None
+        }
         val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, timeTravelVersion, conf)
         (CatalogV2Util.loadTable(catalog, ident, timeTravel).get, Some(catalog), Some(ident))
       case _ =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 9c92c1d9a0b..c82d875faa7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -21,6 +21,7 @@ import java.sql.Timestamp
 import java.time.{Duration, LocalDate, Period}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.MICROSECONDS
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
@@ -2591,6 +2592,8 @@ class DataSourceV2SQLSuite
     val ts2 = DateTimeUtils.stringToTimestampAnsi(
       UTF8String.fromString("2021-01-29 00:00:00"),
       DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+    val ts1InSeconds = MICROSECONDS.toSeconds(ts1).toString
+    val ts2InSeconds = MICROSECONDS.toSeconds(ts2).toString
     val t3 = s"testcat.t$ts1"
     val t4 = s"testcat.t$ts2"
 
@@ -2607,6 +2610,14 @@ class DataSourceV2SQLSuite
         === Array(Row(5), Row(6)))
       assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 00:00:00'").collect
         === Array(Row(7), Row(8)))
+      assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect
+        === Array(Row(5), Row(6)))
+      assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect
+        === Array(Row(7), Row(8)))
+      assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts1InSeconds").collect
+        === Array(Row(5), Row(6)))
+      assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts2InSeconds").collect
+        === Array(Row(7), Row(8)))
       assert(sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1, 29)").collect
         === Array(Row(7), Row(8)))
       assert(sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29 00:00:00')").collect
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index fc843a86aa5..f8278d18b0a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.connector
 
 import java.util.Optional
 
+import scala.concurrent.duration.MICROSECONDS
 import scala.language.implicitConversions
 import scala.util.Try
 
@@ -322,6 +323,12 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
         timestamp = Some("2019-01-29 00:37:58")), df3.toDF())
       checkAnswer(load("t", Some(catalogName), version = None,
         timestamp = Some("2021-01-29 00:37:58")), df4.toDF())
+
+      // load with timestamp in number format
+      checkAnswer(load("t", Some(catalogName), version = None,
+        timestamp = Some(MICROSECONDS.toSeconds(ts1).toString)), df3.toDF())
+      checkAnswer(load("t", Some(catalogName), version = None,
+        timestamp = Some(MICROSECONDS.toSeconds(ts2).toString)), df4.toDF())
     }
 
     val e = intercept[AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org