You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2022/07/01 00:16:57 UTC
[spark] branch master updated: [SPARK-39633][SQL] Support timestamp in seconds for TimeTravel using Dataframe options
This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 44e2657f3d5 [SPARK-39633][SQL] Support timestamp in seconds for TimeTravel using Dataframe options
44e2657f3d5 is described below
commit 44e2657f3d511c25135c95dc3d584c540d227b5b
Author: Prashant Singh <ps...@amazon.com>
AuthorDate: Thu Jun 30 17:16:32 2022 -0700
[SPARK-39633][SQL] Support timestamp in seconds for TimeTravel using Dataframe options
### What changes were proposed in this pull request?
Support timestamp in seconds for TimeTravel using Dataframe options
### Why are the changes needed?
To have a parity in doing TimeTravel via SQL and Dataframe option.
SPARK-SQL supports queries like :
```sql
SELECT * from {table} TIMESTAMP AS OF 1548751078
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added new UTs for testing the behaviour.
Closes #37025 from singhpk234/fix/timetravel_df_options.
Authored-by: Prashant Singh <ps...@amazon.com>
Signed-off-by: huaxingao <hu...@apple.com>
---
.../sql/execution/datasources/v2/DataSourceV2Utils.scala | 12 ++++++++++--
.../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 11 +++++++++++
.../spark/sql/connector/SupportsCatalogOptionsSuite.scala | 7 +++++++
3 files changed, 28 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index f69a2a45886..7fd61c44fd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSuppo
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
private[sql] object DataSourceV2Utils extends Logging {
@@ -124,7 +124,15 @@ private[sql] object DataSourceV2Utils extends Logging {
val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions)
val timeTravelVersion = if (version.isPresent) Some(version.get) else None
- val timeTravelTimestamp = if (timestamp.isPresent) Some(Literal(timestamp.get)) else None
+ val timeTravelTimestamp = if (timestamp.isPresent) {
+ if (timestamp.get.forall(_.isDigit)) {
+ Some(Literal(timestamp.get.toLong, LongType))
+ } else {
+ Some(Literal(timestamp.get))
+ }
+ } else {
+ None
+ }
val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, timeTravelVersion, conf)
(CatalogV2Util.loadTable(catalog, ident, timeTravel).get, Some(catalog), Some(ident))
case _ =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 9c92c1d9a0b..c82d875faa7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -21,6 +21,7 @@ import java.sql.Timestamp
import java.time.{Duration, LocalDate, Period}
import scala.collection.JavaConverters._
+import scala.concurrent.duration.MICROSECONDS
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
@@ -2591,6 +2592,8 @@ class DataSourceV2SQLSuite
val ts2 = DateTimeUtils.stringToTimestampAnsi(
UTF8String.fromString("2021-01-29 00:00:00"),
DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+ val ts1InSeconds = MICROSECONDS.toSeconds(ts1).toString
+ val ts2InSeconds = MICROSECONDS.toSeconds(ts2).toString
val t3 = s"testcat.t$ts1"
val t4 = s"testcat.t$ts2"
@@ -2607,6 +2610,14 @@ class DataSourceV2SQLSuite
=== Array(Row(5), Row(6)))
assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 00:00:00'").collect
=== Array(Row(7), Row(8)))
+ assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect
+ === Array(Row(5), Row(6)))
+ assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect
+ === Array(Row(7), Row(8)))
+ assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts1InSeconds").collect
+ === Array(Row(5), Row(6)))
+ assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts2InSeconds").collect
+ === Array(Row(7), Row(8)))
assert(sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1, 29)").collect
=== Array(Row(7), Row(8)))
assert(sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29 00:00:00')").collect
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index fc843a86aa5..f8278d18b0a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.connector
import java.util.Optional
+import scala.concurrent.duration.MICROSECONDS
import scala.language.implicitConversions
import scala.util.Try
@@ -322,6 +323,12 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
timestamp = Some("2019-01-29 00:37:58")), df3.toDF())
checkAnswer(load("t", Some(catalogName), version = None,
timestamp = Some("2021-01-29 00:37:58")), df4.toDF())
+
+ // load with timestamp in number format
+ checkAnswer(load("t", Some(catalogName), version = None,
+ timestamp = Some(MICROSECONDS.toSeconds(ts1).toString)), df3.toDF())
+ checkAnswer(load("t", Some(catalogName), version = None,
+ timestamp = Some(MICROSECONDS.toSeconds(ts2).toString)), df4.toDF())
}
val e = intercept[AnalysisException] {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org