You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/16 15:12:11 UTC
[spark] branch master updated: [SPARK-37219][SQL] Add AS OF syntax support
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e99fdf9 [SPARK-37219][SQL] Add AS OF syntax support
e99fdf9 is described below
commit e99fdf9654481dd9b691a3c10e52f3f3db6ed2ba
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Tue Nov 16 23:11:15 2021 +0800
[SPARK-37219][SQL] Add AS OF syntax support
### What changes were proposed in this pull request?
https://docs.databricks.com/delta/quick-start.html#query-an-earlier-version-of-the-table-time-travel
Delta Lake time travel allows user to query an older snapshot of a Delta table. To query an older version of a table, user needs to specify a version or timestamp in a SELECT statement using AS OF syntax as the follows
```
SELECT * FROM default.people10m VERSION AS OF 0;
SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58';
```
This PR adds the AS OF syntax support in Spark
### Why are the changes needed?
The following new SQL syntax will be supported:
Delta Lake time travel syntax
- VERSION AS OF
- TIMESTAMP AS OF
For example:
```
SELECT * FROM default.people10m VERSION AS OF 0;
SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58';
```
In order to be compatible with HIVE time travel syntax https://issues.apache.org/jira/browse/HIVE-25344 and temporal features in SQL2011 https://sigmodrecord.org/publications/sigmodRecord/1209/pdfs/07.industry.kulkarni.pdf, we will also support
- FOR SYSTEM_TIME AS OF
- FOR SYSTEM_VERSION AS OF
For example:
```
SELECT * FROM default.people10m FOR SYSTEM_VERSION AS OF 0;
SELECT * FROM default.people10m FOR SYSTEM_TIME AS OF '2019-01-29 00:37:58';
```
Application can also specify time travel version or timestamp using `DataFrameReader` as follows:
```
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').load('/mnt/delta/people-10m')
df2 = spark.read.format('delta').option('versionAsOf', 2).load('/mnt/delta/people-10m')
```
In order for `DataFrameReader` to get the time travel options such as `timestampAsOf` or `versionAsOf`, we will add the following new APIs in `SupportsCatalogOptions`:
```
/**
* Extracts the timestamp string for time travel from the given options.
*/
default Optional<String> extractTimeTravelTimestamp(CaseInsensitiveStringMap options) {
return Optional.empty();
}
/**
* Extracts the version string for time travel from the given options.
*/
default Optional<String> extractTimeTravelVersion(CaseInsensitiveStringMap options) {
return Optional.empty();
}
```
In addition, a couple of new APIs in `TableCatalog` will be added to support time travel:
```
Table loadTable(Identifier ident, String version)
Table loadTable(Identifier ident, long timestamp)
```
### Does this PR introduce _any_ user-facing change?
Yes, please see section 2
### How was this patch tested?
new UT
Closes #34497 from huaxingao/asof.
Lead-authored-by: Huaxin Gao <hu...@apple.com>
Co-authored-by: Huaxin Gao <hu...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
docs/sql-ref-ansi-compliance.md | 4 +
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 19 ++++-
.../connector/catalog/SupportsCatalogOptions.java | 16 ++++
.../spark/sql/connector/catalog/TableCatalog.java | 30 ++++++++
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +-
.../sql/catalyst/analysis/CTESubstitution.scala | 2 +-
.../spark/sql/catalyst/analysis/ResolveHints.scala | 4 +-
.../spark/sql/catalyst/analysis/unresolved.scala | 12 ++-
.../spark/sql/catalyst/parser/AstBuilder.scala | 14 +++-
.../sql/connector/catalog/CatalogV2Util.scala | 17 +++-
.../sql/connector/expressions/expressions.scala | 26 +++++++
.../spark/sql/errors/QueryCompilationErrors.scala | 4 +
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 68 +++++++++++++++-
.../connector/catalog/InMemoryTableCatalog.scala | 20 +++++
.../datasources/v2/DataSourceV2Utils.scala | 10 ++-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 53 +++++++++++++
.../connector/SupportsCatalogOptionsSuite.scala | 90 +++++++++++++++++++++-
.../org/apache/spark/sql/hive/test/TestHive.scala | 2 +-
18 files changed, 377 insertions(+), 18 deletions(-)
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 2de8ba7..b93f8b7 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -531,6 +531,8 @@ Below is a list of all the keywords in Spark SQL.
|SUBSTR|non-reserved|non-reserved|non-reserved|
|SUBSTRING|non-reserved|non-reserved|non-reserved|
|SYNC|non-reserved|non-reserved|non-reserved|
+|SYSTEM_TIME|non-reserved|non-reserved|non-reserved|
+|SYSTEM_VERSION|non-reserved|non-reserved|non-reserved|
|TABLE|reserved|non-reserved|reserved|
|TABLES|non-reserved|non-reserved|non-reserved|
|TABLESAMPLE|non-reserved|non-reserved|reserved|
@@ -540,6 +542,7 @@ Below is a list of all the keywords in Spark SQL.
|TERMINATED|non-reserved|non-reserved|non-reserved|
|THEN|reserved|non-reserved|reserved|
|TIME|reserved|non-reserved|reserved|
+|TIMESTAMP|non-reserved|non-reserved|non-reserved|
|TO|reserved|non-reserved|reserved|
|TOUCH|non-reserved|non-reserved|non-reserved|
|TRAILING|reserved|non-reserved|reserved|
@@ -564,6 +567,7 @@ Below is a list of all the keywords in Spark SQL.
|USER|reserved|non-reserved|reserved|
|USING|reserved|strict-non-reserved|reserved|
|VALUES|non-reserved|non-reserved|reserved|
+|VERSION|non-reserved|non-reserved|non-reserved|
|VIEW|non-reserved|non-reserved|non-reserved|
|VIEWS|non-reserved|non-reserved|non-reserved|
|WHEN|reserved|non-reserved|reserved|
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 6aab5fe..e11f737 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -599,6 +599,11 @@ fromClause
: FROM relation (',' relation)* lateralView* pivotClause?
;
+temporalClause
+ : ((FOR SYSTEM_VERSION) | VERSION) AS OF version=(INTEGER_VALUE | STRING)
+ | ((FOR SYSTEM_TIME) | TIMESTAMP) AS OF timestamp=STRING
+ ;
+
aggregationClause
: GROUP BY groupingExpressionsWithGroupingAnalytics+=groupByClause
(',' groupingExpressionsWithGroupingAnalytics+=groupByClause)*
@@ -711,7 +716,7 @@ identifierComment
;
relationPrimary
- : multipartIdentifier sample? tableAlias #tableName
+ : multipartIdentifier temporalClause? sample? tableAlias #tableName
| '(' query ')' sample? tableAlias #aliasedQuery
| '(' relation ')' sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
@@ -1229,11 +1234,14 @@ ansiNonReserved
| SUBSTR
| SUBSTRING
| SYNC
+ | SYSTEM_TIME
+ | SYSTEM_VERSION
| TABLES
| TABLESAMPLE
| TBLPROPERTIES
| TEMPORARY
| TERMINATED
+ | TIMESTAMP
| TOUCH
| TRANSACTION
| TRANSACTIONS
@@ -1251,6 +1259,7 @@ ansiNonReserved
| UPDATE
| USE
| VALUES
+ | VERSION
| VIEW
| VIEWS
| WINDOW
@@ -1497,6 +1506,8 @@ nonReserved
| SUBSTR
| SUBSTRING
| SYNC
+ | SYSTEM_TIME
+ | SYSTEM_VERSION
| TABLE
| TABLES
| TABLESAMPLE
@@ -1505,6 +1516,7 @@ nonReserved
| TERMINATED
| THEN
| TIME
+ | TIMESTAMP
| TO
| TOUCH
| TRAILING
@@ -1527,6 +1539,7 @@ nonReserved
| USE
| USER
| VALUES
+ | VERSION
| VIEW
| VIEWS
| WHEN
@@ -1767,6 +1780,8 @@ STRUCT: 'STRUCT';
SUBSTR: 'SUBSTR';
SUBSTRING: 'SUBSTRING';
SYNC: 'SYNC';
+SYSTEM_TIME: 'SYSTEM_TIME';
+SYSTEM_VERSION: 'SYSTEM_VERSION';
TABLE: 'TABLE';
TABLES: 'TABLES';
TABLESAMPLE: 'TABLESAMPLE';
@@ -1775,6 +1790,7 @@ TEMPORARY: 'TEMPORARY' | 'TEMP';
TERMINATED: 'TERMINATED';
THEN: 'THEN';
TIME: 'TIME';
+TIMESTAMP: 'TIMESTAMP';
TO: 'TO';
TOUCH: 'TOUCH';
TRAILING: 'TRAILING';
@@ -1799,6 +1815,7 @@ USE: 'USE';
USER: 'USER';
USING: 'USING';
VALUES: 'VALUES';
+VERSION: 'VERSION';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
WHEN: 'WHEN';
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java
index e779d52..a84f179 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java
@@ -17,6 +17,8 @@
package org.apache.spark.sql.connector.catalog;
+import java.util.Optional;
+
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -52,4 +54,18 @@ public interface SupportsCatalogOptions extends TableProvider {
default String extractCatalog(CaseInsensitiveStringMap options) {
return CatalogManager.SESSION_CATALOG_NAME();
}
+
+ /**
+ * Extracts the timestamp string for time travel from the given options.
+ */
+ default Optional<String> extractTimeTravelTimestamp(CaseInsensitiveStringMap options) {
+ return Optional.empty();
+ }
+
+ /**
+ * Extracts the version string for time travel from the given options.
+ */
+ default Optional<String> extractTimeTravelVersion(CaseInsensitiveStringMap options) {
+ return Optional.empty();
+ }
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 4163d86..d7a45f6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -95,6 +95,36 @@ public interface TableCatalog extends CatalogPlugin {
Table loadTable(Identifier ident) throws NoSuchTableException;
/**
+ * Load table metadata of a specific version by {@link Identifier identifier} from the catalog.
+ * <p>
+ * If the catalog supports views and contains a view for the identifier and not a table, this
+ * must throw {@link NoSuchTableException}.
+ *
+ * @param ident a table identifier
+ * @param version version of the table
+ * @return the table's metadata
+ * @throws NoSuchTableException If the table doesn't exist or is a view
+ */
+ default Table loadTable(Identifier ident, String version) throws NoSuchTableException {
+ throw new UnsupportedOperationException("Load table with version is not supported.");
+ }
+
+ /**
+ * Load table metadata at a specific time by {@link Identifier identifier} from the catalog.
+ * <p>
+ * If the catalog supports views and contains a view for the identifier and not a table, this
+ * must throw {@link NoSuchTableException}.
+ *
+ * @param ident a table identifier
+ * @param timestamp timestamp of the table, which is microseconds since 1970-01-01 00:00:00 UTC
+ * @return the table's metadata
+ * @throws NoSuchTableException If the table doesn't exist or is a view
+ */
+ default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
+ throw new UnsupportedOperationException("Load table with timestamp is not supported.");
+ }
+
+ /**
* Invalidate cached table metadata for an {@link Identifier identifier}.
* <p>
* If the table is already loaded or cached, drop cached data. If the table does not exist or is
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 26d206f..4186a5b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1186,8 +1186,8 @@ class Analyzer(override val catalogManager: CatalogManager)
newRelation.copyTagsFrom(multi)
newRelation
}).orElse {
- val loaded = createRelation(
- catalog, ident, CatalogV2Util.loadTable(catalog, ident), u.options, u.isStreaming)
+ val table = CatalogV2Util.loadTable(catalog, ident, u.timeTravelSpec)
+ val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming)
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
loaded
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index a67d85d..ec3d957 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -224,7 +224,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
alwaysInline: Boolean,
cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan =
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
- case u @ UnresolvedRelation(Seq(table), _, _) =>
+ case u @ UnresolvedRelation(Seq(table), _, _, _) =>
cteRelations.find(r => plan.conf.resolver(r._1, table)).map { case (_, d) =>
if (alwaysInline) {
d.child
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index 27f2a5f..10d8d39 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -106,7 +106,7 @@ object ResolveHints {
val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
- case ResolvedHint(u @ UnresolvedRelation(ident, _, _), hint)
+ case ResolvedHint(u @ UnresolvedRelation(ident, _, _, _), hint)
if matchedIdentifierInHint(ident) =>
ResolvedHint(u, createHintInfo(hintName).merge(hint, hintErrorHandler))
@@ -114,7 +114,7 @@ object ResolveHints {
if matchedIdentifierInHint(extractIdentifier(r)) =>
ResolvedHint(r, createHintInfo(hintName).merge(hint, hintErrorHandler))
- case UnresolvedRelation(ident, _, _) if matchedIdentifierInHint(ident) =>
+ case UnresolvedRelation(ident, _, _, _) if matchedIdentifierInHint(ident) =>
ResolvedHint(plan, createHintInfo(hintName))
case r: SubqueryAlias if matchedIdentifierInHint(extractIdentifier(r)) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 0785336..af6837a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.connector.expressions.TimeTravelSpec
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -45,7 +46,8 @@ class UnresolvedException(function: String)
case class UnresolvedRelation(
multipartIdentifier: Seq[String],
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
- override val isStreaming: Boolean = false)
+ override val isStreaming: Boolean = false,
+ timeTravelSpec: Option[TimeTravelSpec] = None)
extends LeafNode with NamedRelation {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -65,9 +67,13 @@ object UnresolvedRelation {
def apply(
tableIdentifier: TableIdentifier,
extraOptions: CaseInsensitiveStringMap,
- isStreaming: Boolean): UnresolvedRelation = {
+ isStreaming: Boolean,
+ timeTravelSpec: Option[TimeTravelSpec]): UnresolvedRelation = {
UnresolvedRelation(
- tableIdentifier.database.toSeq :+ tableIdentifier.table, extraOptions, isStreaming)
+ tableIdentifier.database.toSeq :+ tableIdentifier.table,
+ extraOptions,
+ isStreaming,
+ timeTravelSpec)
}
def apply(tableIdentifier: TableIdentifier): UnresolvedRelation =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 71cd8b2..754f92b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, Inte
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone}
import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
-import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
+import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, TimeTravelSpec, Transform, YearsTransform}
import org.apache.spark.sql.errors.QueryParsingErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -1257,7 +1257,17 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
- val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId))
+ val timeTravel = if (ctx.temporalClause != null) {
+ val v = ctx.temporalClause.version
+ val version =
+ if (ctx.temporalClause.INTEGER_VALUE != null) Some(v.getText) else Option(v).map(string)
+ TimeTravelSpec.create(Option(ctx.temporalClause.timestamp).map(string), version)
+ } else {
+ None
+ }
+
+ val table = mayApplyAliasPlan(ctx.tableAlias,
+ UnresolvedRelation(tableId, timeTravelSpec = timeTravel))
table.optionalMap(ctx.sample)(withSample)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 69625a1..c010b6d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
import org.apache.spark.sql.connector.catalog.TableChange._
+import org.apache.spark.sql.connector.expressions.{AsOfTimestamp, AsOfVersion, TimeTravelSpec}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -276,9 +277,21 @@ private[sql] object CatalogV2Util {
new StructType(newFields)
}
- def loadTable(catalog: CatalogPlugin, ident: Identifier): Option[Table] =
+ def loadTable(
+ catalog: CatalogPlugin,
+ ident: Identifier,
+ timeTravelSpec: Option[TimeTravelSpec] = None): Option[Table] =
try {
- Option(catalog.asTableCatalog.loadTable(ident))
+ if (timeTravelSpec.nonEmpty) {
+ timeTravelSpec.get match {
+ case v: AsOfVersion =>
+ Option(catalog.asTableCatalog.loadTable(ident, v.version))
+ case ts: AsOfTimestamp =>
+ Option(catalog.asTableCatalog.loadTable(ident, ts.timestamp))
+ }
+ } else {
+ Option(catalog.asTableCatalog.loadTable(ident))
+ }
} catch {
case _: NoSuchTableException => None
case _: NoSuchDatabaseException => None
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
index 2863d94..32fe208 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
@@ -19,7 +19,11 @@ package org.apache.spark.sql.connector.expressions
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
/**
* Helper methods for working with the logical expressions API.
@@ -357,3 +361,25 @@ private[sql] object SortValue {
None
}
}
+
+private[sql] sealed trait TimeTravelSpec
+
+private[sql] case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec
+private[sql] case class AsOfVersion(version: String) extends TimeTravelSpec
+
+private[sql] object TimeTravelSpec {
+ def create(timestamp: Option[String], version: Option[String]) : Option[TimeTravelSpec] = {
+ if (timestamp.nonEmpty && version.nonEmpty) {
+ throw QueryCompilationErrors.invalidTimeTravelSpecError()
+ } else if (timestamp.nonEmpty) {
+ val ts = DateTimeUtils.stringToTimestampAnsi(
+ UTF8String.fromString(timestamp.get),
+ DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+ Some(AsOfTimestamp(ts))
+ } else if (version.nonEmpty) {
+ Some(AsOfVersion(version.get))
+ } else {
+ None
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index fe1c358..207a9c3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2371,4 +2371,8 @@ object QueryCompilationErrors {
new AnalysisException(
s"Invalid view text: $viewText. The view $tableName may have been tampered with")
}
+
+ def invalidTimeTravelSpecError(): Throwable = {
+ new AnalysisException("Cannot specify both version and timestamp when scanning the table.")
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 572fccf..cea1fdf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.parser
+import java.time.DateTimeException
+import java.util
import java.util.Locale
import org.apache.spark.sql.AnalysisException
@@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
-import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
+import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, TimeTravelSpec, Transform, YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
class DDLParserSuite extends AnalysisTest {
@@ -2438,4 +2441,67 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsePlan(timestampTypeSql), insertPartitionPlan(timestamp))
comparePlans(parsePlan(binaryTypeSql), insertPartitionPlan(binaryStr))
}
+
+ test("as of syntax") {
+ val properties = new util.HashMap[String, String]
+ var timeTravel = TimeTravelSpec.create(None, Some("Snapshot123456789"))
+ comparePlans(
+ parsePlan("SELECT * FROM a.b.c VERSION AS OF 'Snapshot123456789'"),
+ Project(Seq(UnresolvedStar(None)),
+ UnresolvedRelation(
+ Seq("a", "b", "c"),
+ new CaseInsensitiveStringMap(properties),
+ timeTravelSpec = timeTravel)))
+
+ timeTravel = TimeTravelSpec.create(None, Some("123456789"))
+ comparePlans(
+ parsePlan("SELECT * FROM a.b.c FOR SYSTEM_VERSION AS OF 123456789"),
+ Project(Seq(UnresolvedStar(None)),
+ UnresolvedRelation(
+ Seq("a", "b", "c"),
+ new CaseInsensitiveStringMap(properties),
+ timeTravelSpec = timeTravel)))
+
+ timeTravel = TimeTravelSpec.create(Some("2019-01-29 00:37:58"), None)
+ comparePlans(
+ parsePlan("SELECT * FROM a.b.c TIMESTAMP AS OF '2019-01-29 00:37:58'"),
+ Project(Seq(UnresolvedStar(None)),
+ UnresolvedRelation(
+ Seq("a", "b", "c"),
+ new CaseInsensitiveStringMap(properties),
+ timeTravelSpec = timeTravel)))
+ comparePlans(
+ parsePlan("SELECT * FROM a.b.c FOR SYSTEM_TIME AS OF '2019-01-29 00:37:58'"),
+ Project(Seq(UnresolvedStar(None)),
+ UnresolvedRelation(
+ Seq("a", "b", "c"),
+ new CaseInsensitiveStringMap(properties),
+ timeTravelSpec = timeTravel)))
+
+ timeTravel = TimeTravelSpec.create(Some("2019-01-29"), None)
+ comparePlans(
+ parsePlan("SELECT * FROM a.b.c TIMESTAMP AS OF '2019-01-29'"),
+ Project(Seq(UnresolvedStar(None)),
+ UnresolvedRelation(
+ Seq("a", "b", "c"),
+ new CaseInsensitiveStringMap(properties),
+ timeTravelSpec = timeTravel)))
+ comparePlans(
+ parsePlan("SELECT * FROM a.b.c FOR SYSTEM_TIME AS OF '2019-01-29'"),
+ Project(Seq(UnresolvedStar(None)),
+ UnresolvedRelation(
+ Seq("a", "b", "c"),
+ new CaseInsensitiveStringMap(properties),
+ timeTravelSpec = timeTravel)))
+
+ val e1 = intercept[DateTimeException] {
+ parsePlan("SELECT * FROM a.b.c TIMESTAMP AS OF '2019-01-11111'")
+ }.getMessage
+ assert(e1.contains("Cannot cast 2019-01-11111 to TimestampType."))
+
+ val e2 = intercept[AnalysisException] {
+ timeTravel = TimeTravelSpec.create(Some("2019-01-29 00:37:58"), Some("123456789"))
+ }.getMessage
+ assert(e2.contains("Cannot specify both version and timestamp when scanning the table."))
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 0c403ba..d8e6bc4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -60,6 +60,26 @@ class BasicInMemoryTableCatalog extends TableCatalog {
}
}
+ override def loadTable(ident: Identifier, version: String): Table = {
+ val versionIdent = Identifier.of(ident.namespace, ident.name + version)
+ Option(tables.get(versionIdent)) match {
+ case Some(table) =>
+ table
+ case _ =>
+ throw new NoSuchTableException(ident)
+ }
+ }
+
+ override def loadTable(ident: Identifier, timestamp: Long): Table = {
+ val timestampIdent = Identifier.of(ident.namespace, ident.name + timestamp)
+ Option(tables.get(timestampIdent)) match {
+ case Some(table) =>
+ table
+ case _ =>
+ throw new NoSuchTableException(ident)
+ }
+ }
+
override def invalidateTable(ident: Identifier): Unit = {
invalidatedTables.add(ident)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index a0874fc..eeb12c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.expressions.TimeTravelSpec
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -117,7 +118,14 @@ private[sql] object DataSourceV2Utils extends Logging {
hasCatalog,
catalogManager,
dsOptions)
- (catalog.loadTable(ident), Some(catalog), Some(ident))
+
+ val version = hasCatalog.extractTimeTravelVersion(dsOptions)
+ val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions)
+
+ val timeTravelVersion = if (version.isPresent) Some(version.get) else None
+ val timeTravelTimestamp = if (timestamp.isPresent) Some(timestamp.get) else None
+ val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, timeTravelVersion)
+ (CatalogV2Util.loadTable(catalog, ident, timeTravel).get, Some(catalog), Some(ident))
case _ =>
// TODO: Non-catalog paths for DSV2 are currently not well defined.
val tbl = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 9699685..7bdcad0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
@@ -2948,6 +2949,58 @@ class DataSourceV2SQLSuite
}
}
+ test("Mock time travel test") {
+ sql("use testcat")
+ val t1 = "testcat.tSnapshot123456789"
+ val t2 = "testcat.t2345678910"
+ withTable(t1, t2) {
+ sql(s"CREATE TABLE $t1 (id int) USING foo")
+ sql(s"CREATE TABLE $t2 (id int) USING foo")
+
+ sql(s"INSERT INTO $t1 VALUES (1)")
+ sql(s"INSERT INTO $t1 VALUES (2)")
+ sql(s"INSERT INTO $t2 VALUES (3)")
+ sql(s"INSERT INTO $t2 VALUES (4)")
+
+ assert(sql("SELECT * FROM t VERSION AS OF 'Snapshot123456789'").collect
+ === Array(Row(1), Row(2)))
+ assert(sql("SELECT * FROM t VERSION AS OF 2345678910").collect
+ === Array(Row(3), Row(4)))
+ assert(sql("SELECT * FROM t FOR SYSTEM_VERSION AS OF 'Snapshot123456789'").collect
+ === Array(Row(1), Row(2)))
+ assert(sql("SELECT * FROM t FOR SYSTEM_VERSION AS OF 2345678910").collect
+ === Array(Row(3), Row(4)))
+ }
+
+ val ts1 = DateTimeUtils.stringToTimestampAnsi(
+ UTF8String.fromString("2019-01-29 00:37:58"),
+ DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+ val ts2 = DateTimeUtils.stringToTimestampAnsi(
+ UTF8String.fromString("2021-01-29 00:37:58"),
+ DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+ val t3 = s"testcat.t$ts1"
+ val t4 = s"testcat.t$ts2"
+
+ withTable(t3, t4) {
+ sql(s"CREATE TABLE $t3 (id int) USING foo")
+ sql(s"CREATE TABLE $t4 (id int) USING foo")
+
+ sql(s"INSERT INTO $t3 VALUES (5)")
+ sql(s"INSERT INTO $t3 VALUES (6)")
+ sql(s"INSERT INTO $t4 VALUES (7)")
+ sql(s"INSERT INTO $t4 VALUES (8)")
+
+ assert(sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29 00:37:58'").collect
+ === Array(Row(5), Row(6)))
+ assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 00:37:58'").collect
+ === Array(Row(7), Row(8)))
+ assert(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF '2019-01-29 00:37:58'").collect
+ === Array(Row(5), Row(6)))
+ assert(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF '2021-01-29 00:37:58'").collect
+ === Array(Row(7), Row(8)))
+ }
+ }
+
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index 076dad7..3840dd3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -17,24 +17,29 @@
package org.apache.spark.sql.connector
+import java.util.Optional
+
import scala.language.implicitConversions
import scala.util.Try
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
-import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, SupportsCatalogOptions, TableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, QueryExecutionListener}
+import org.apache.spark.unsafe.types.UTF8String
class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with BeforeAndAfter {
@@ -271,6 +276,61 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
}
}
+ test("mock time travel test") {
+ val t1 = s"$catalogName.tSnapshot123456789"
+ val t2 = s"$catalogName.t2345678910"
+ withTable(t1, t2) {
+ sql(s"create table $t1 (id bigint) using $format")
+ sql(s"create table $t2 (id bigint) using $format")
+
+ val df1 = spark.range(10)
+ df1.write.format(format).option("name", "tSnapshot123456789").option("catalog", catalogName)
+ .mode(SaveMode.Append).save()
+
+ val df2 = spark.range(10, 20)
+ df2.write.format(format).option("name", "t2345678910").option("catalog", catalogName)
+ .mode(SaveMode.Overwrite).save()
+
+ // load with version
+ checkAnswer(load("t", Some(catalogName), version = Some("Snapshot123456789")), df1.toDF())
+ checkAnswer(load("t", Some(catalogName), version = Some("2345678910")), df2.toDF())
+ }
+
+ val ts1 = DateTimeUtils.stringToTimestampAnsi(
+ UTF8String.fromString("2019-01-29 00:37:58"),
+ DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+ val ts2 = DateTimeUtils.stringToTimestampAnsi(
+ UTF8String.fromString("2021-01-29 00:37:58"),
+ DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+ val t3 = s"$catalogName.t$ts1"
+ val t4 = s"$catalogName.t$ts2"
+ withTable(t3, t4) {
+ sql(s"create table $t3 (id bigint) using $format")
+ sql(s"create table $t4 (id bigint) using $format")
+
+ val df3 = spark.range(30, 40)
+ df3.write.format(format).option("name", s"t$ts1").option("catalog", catalogName)
+ .mode(SaveMode.Append).save()
+
+ val df4 = spark.range(50, 60)
+ df4.write.format(format).option("name", s"t$ts2").option("catalog", catalogName)
+ .mode(SaveMode.Overwrite).save()
+
+ // load with timestamp
+ checkAnswer(load("t", Some(catalogName), version = None,
+ timestamp = Some("2019-01-29 00:37:58")), df3.toDF())
+ checkAnswer(load("t", Some(catalogName), version = None,
+ timestamp = Some("2021-01-29 00:37:58")), df4.toDF())
+ }
+
+ val e = intercept[AnalysisException] {
+ load("t", Some(catalogName), version = Some("12345678"),
+ timestamp = Some("2019-01-29 00:37:58"))
+ }
+ assert(e.getMessage
+ .contains("Cannot specify both version and timestamp when scanning the table."))
+ }
+
private def checkV2Identifiers(
plan: LogicalPlan,
identifier: String = "t1",
@@ -281,9 +341,19 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
assert(v2.catalog.exists(_ == catalogPlugin))
}
- private def load(name: String, catalogOpt: Option[String]): DataFrame = {
+ private def load(
+ name: String,
+ catalogOpt: Option[String],
+ version: Option[String] = None,
+ timestamp: Option[String] = None): DataFrame = {
val dfr = spark.read.format(format).option("name", name)
catalogOpt.foreach(cName => dfr.option("catalog", cName))
+ if (version.nonEmpty) {
+ dfr.option("versionAsOf", version.get)
+ }
+ if (timestamp.nonEmpty) {
+ dfr.option("timestampAsOf", timestamp.get)
+ }
dfr.load()
}
@@ -312,4 +382,20 @@ class CatalogSupportingInMemoryTableProvider
override def extractCatalog(options: CaseInsensitiveStringMap): String = {
options.get("catalog")
}
+
+ override def extractTimeTravelVersion(options: CaseInsensitiveStringMap): Optional[String] = {
+ if (options.get("versionAsOf") != null) {
+ Optional.of(options.get("versionAsOf"))
+ } else {
+ Optional.empty[String]
+ }
+ }
+
+ override def extractTimeTravelTimestamp(options: CaseInsensitiveStringMap): Optional[String] = {
+ if (options.get("timestampAsOf") != null) {
+ Optional.of(options.get("timestampAsOf"))
+ } else {
+ Optional.empty[String]
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 3769de0..10cc55b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -599,7 +599,7 @@ private[hive] class TestHiveQueryExecution(
override lazy val analyzed: LogicalPlan = sparkSession.withActive {
// Make sure any test tables referenced are loaded.
val referencedTables = logical.collect {
- case UnresolvedRelation(ident, _, _) =>
+ case UnresolvedRelation(ident, _, _, _) =>
if (ident.length > 1 && ident.head.equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)) {
ident.tail.asTableIdentifier
} else ident.asTableIdentifier
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org