You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/16 15:12:11 UTC

[spark] branch master updated: [SPARK-37219][SQL] Add AS OF syntax support

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e99fdf9  [SPARK-37219][SQL] Add AS OF syntax support
e99fdf9 is described below

commit e99fdf9654481dd9b691a3c10e52f3f3db6ed2ba
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Tue Nov 16 23:11:15 2021 +0800

    [SPARK-37219][SQL] Add AS OF syntax support
    
    ### What changes were proposed in this pull request?
    https://docs.databricks.com/delta/quick-start.html#query-an-earlier-version-of-the-table-time-travel
    
    Delta Lake time travel allows user to query an older snapshot of a Delta table. To query an older version of a table, user needs to specify a version or timestamp in a SELECT statement using AS OF syntax as the follows
    
    ```
    SELECT * FROM default.people10m VERSION AS OF 0;
    
    SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58';
    ```
    
    This PR adds the AS OF syntax support in Spark
    
    ### Why are the changes needed?
    The following new SQL syntax will be supported:
    
    Delta Lake time travel syntax
    - VERSION AS OF
    - TIMESTAMP AS OF
    
    For example:
    ```
    SELECT * FROM default.people10m VERSION AS OF 0;
    
    SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58';
    ```
    In order to be compatible with HIVE time travel syntax https://issues.apache.org/jira/browse/HIVE-25344 and temporal features in SQL2011 https://sigmodrecord.org/publications/sigmodRecord/1209/pdfs/07.industry.kulkarni.pdf, we will also support
    
    - FOR SYSTEM_TIME AS OF
    - FOR SYSTEM_VERSION AS OF
    
    For example:
    ```
    SELECT * FROM default.people10m FOR SYSTEM_VERSION AS OF 0;
    
    SELECT * FROM default.people10m FOR SYSTEM_TIME AS OF '2019-01-29 00:37:58';
    ```
    
    Application can also specify time travel version or timestamp using `DataFrameReader` as follows:
    ```
    df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').load('/mnt/delta/people-10m')
    
    df2 = spark.read.format('delta').option('versionAsOf', 2).load('/mnt/delta/people-10m')
    ```
    
    In order for `DataFrameReader` to get the time travel options such as `timestampAsOf` or `versionAsOf`, we will add the following new APIs in `SupportsCatalogOptions`:
    ```
      /**
       * Extracts the timestamp string for time travel from the given options.
       */
      default Optional<String> extractTimeTravelTimestamp(CaseInsensitiveStringMap options) {
        return Optional.empty();
      }
    
      /**
       * Extracts the version string for time travel from the given options.
       */
      default Optional<String> extractTimeTravelVersion(CaseInsensitiveStringMap options) {
        return Optional.empty();
      }
    ```
    In addition, a couple of new APIs in `TableCatalog` will be added to support time travel:
    
    ```
    Table loadTable(Identifier ident, String version)
    
    Table loadTable(Identifier ident, long timestamp)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, please see section 2
    
    ### How was this patch tested?
    new UT
    
    Closes #34497 from huaxingao/asof.
    
    Lead-authored-by: Huaxin Gao <hu...@apple.com>
    Co-authored-by: Huaxin Gao <hu...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 docs/sql-ref-ansi-compliance.md                    |  4 +
 .../apache/spark/sql/catalyst/parser/SqlBase.g4    | 19 ++++-
 .../connector/catalog/SupportsCatalogOptions.java  | 16 ++++
 .../spark/sql/connector/catalog/TableCatalog.java  | 30 ++++++++
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  4 +-
 .../sql/catalyst/analysis/CTESubstitution.scala    |  2 +-
 .../spark/sql/catalyst/analysis/ResolveHints.scala |  4 +-
 .../spark/sql/catalyst/analysis/unresolved.scala   | 12 ++-
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 14 +++-
 .../sql/connector/catalog/CatalogV2Util.scala      | 17 +++-
 .../sql/connector/expressions/expressions.scala    | 26 +++++++
 .../spark/sql/errors/QueryCompilationErrors.scala  |  4 +
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 68 +++++++++++++++-
 .../connector/catalog/InMemoryTableCatalog.scala   | 20 +++++
 .../datasources/v2/DataSourceV2Utils.scala         | 10 ++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 53 +++++++++++++
 .../connector/SupportsCatalogOptionsSuite.scala    | 90 +++++++++++++++++++++-
 .../org/apache/spark/sql/hive/test/TestHive.scala  |  2 +-
 18 files changed, 377 insertions(+), 18 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 2de8ba7..b93f8b7 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -531,6 +531,8 @@ Below is a list of all the keywords in Spark SQL.
 |SUBSTR|non-reserved|non-reserved|non-reserved|
 |SUBSTRING|non-reserved|non-reserved|non-reserved|
 |SYNC|non-reserved|non-reserved|non-reserved|
+|SYSTEM_TIME|non-reserved|non-reserved|non-reserved|
+|SYSTEM_VERSION|non-reserved|non-reserved|non-reserved|
 |TABLE|reserved|non-reserved|reserved|
 |TABLES|non-reserved|non-reserved|non-reserved|
 |TABLESAMPLE|non-reserved|non-reserved|reserved|
@@ -540,6 +542,7 @@ Below is a list of all the keywords in Spark SQL.
 |TERMINATED|non-reserved|non-reserved|non-reserved|
 |THEN|reserved|non-reserved|reserved|
 |TIME|reserved|non-reserved|reserved|
+|TIMESTAMP|non-reserved|non-reserved|non-reserved|
 |TO|reserved|non-reserved|reserved|
 |TOUCH|non-reserved|non-reserved|non-reserved|
 |TRAILING|reserved|non-reserved|reserved|
@@ -564,6 +567,7 @@ Below is a list of all the keywords in Spark SQL.
 |USER|reserved|non-reserved|reserved|
 |USING|reserved|strict-non-reserved|reserved|
 |VALUES|non-reserved|non-reserved|reserved|
+|VERSION|non-reserved|non-reserved|non-reserved|
 |VIEW|non-reserved|non-reserved|non-reserved|
 |VIEWS|non-reserved|non-reserved|non-reserved|
 |WHEN|reserved|non-reserved|reserved|
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 6aab5fe..e11f737 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -599,6 +599,11 @@ fromClause
     : FROM relation (',' relation)* lateralView* pivotClause?
     ;
 
+temporalClause
+    : ((FOR SYSTEM_VERSION) | VERSION) AS OF version=(INTEGER_VALUE | STRING)
+    | ((FOR SYSTEM_TIME) | TIMESTAMP) AS OF timestamp=STRING
+    ;
+
 aggregationClause
     : GROUP BY groupingExpressionsWithGroupingAnalytics+=groupByClause
         (',' groupingExpressionsWithGroupingAnalytics+=groupByClause)*
@@ -711,7 +716,7 @@ identifierComment
     ;
 
 relationPrimary
-    : multipartIdentifier sample? tableAlias  #tableName
+    : multipartIdentifier temporalClause? sample? tableAlias  #tableName
     | '(' query ')' sample? tableAlias        #aliasedQuery
     | '(' relation ')' sample? tableAlias     #aliasedRelation
     | inlineTable                             #inlineTableDefault2
@@ -1229,11 +1234,14 @@ ansiNonReserved
     | SUBSTR
     | SUBSTRING
     | SYNC
+    | SYSTEM_TIME
+    | SYSTEM_VERSION
     | TABLES
     | TABLESAMPLE
     | TBLPROPERTIES
     | TEMPORARY
     | TERMINATED
+    | TIMESTAMP
     | TOUCH
     | TRANSACTION
     | TRANSACTIONS
@@ -1251,6 +1259,7 @@ ansiNonReserved
     | UPDATE
     | USE
     | VALUES
+    | VERSION
     | VIEW
     | VIEWS
     | WINDOW
@@ -1497,6 +1506,8 @@ nonReserved
     | SUBSTR
     | SUBSTRING
     | SYNC
+    | SYSTEM_TIME
+    | SYSTEM_VERSION
     | TABLE
     | TABLES
     | TABLESAMPLE
@@ -1505,6 +1516,7 @@ nonReserved
     | TERMINATED
     | THEN
     | TIME
+    | TIMESTAMP
     | TO
     | TOUCH
     | TRAILING
@@ -1527,6 +1539,7 @@ nonReserved
     | USE
     | USER
     | VALUES
+    | VERSION
     | VIEW
     | VIEWS
     | WHEN
@@ -1767,6 +1780,8 @@ STRUCT: 'STRUCT';
 SUBSTR: 'SUBSTR';
 SUBSTRING: 'SUBSTRING';
 SYNC: 'SYNC';
+SYSTEM_TIME: 'SYSTEM_TIME';
+SYSTEM_VERSION: 'SYSTEM_VERSION';
 TABLE: 'TABLE';
 TABLES: 'TABLES';
 TABLESAMPLE: 'TABLESAMPLE';
@@ -1775,6 +1790,7 @@ TEMPORARY: 'TEMPORARY' | 'TEMP';
 TERMINATED: 'TERMINATED';
 THEN: 'THEN';
 TIME: 'TIME';
+TIMESTAMP: 'TIMESTAMP';
 TO: 'TO';
 TOUCH: 'TOUCH';
 TRAILING: 'TRAILING';
@@ -1799,6 +1815,7 @@ USE: 'USE';
 USER: 'USER';
 USING: 'USING';
 VALUES: 'VALUES';
+VERSION: 'VERSION';
 VIEW: 'VIEW';
 VIEWS: 'VIEWS';
 WHEN: 'WHEN';
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java
index e779d52..a84f179 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.connector.catalog;
 
+import java.util.Optional;
+
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
@@ -52,4 +54,18 @@ public interface SupportsCatalogOptions extends TableProvider {
   default String extractCatalog(CaseInsensitiveStringMap options) {
     return CatalogManager.SESSION_CATALOG_NAME();
   }
+
+  /**
+   * Extracts the timestamp string for time travel from the given options.
+   */
+  default Optional<String> extractTimeTravelTimestamp(CaseInsensitiveStringMap options) {
+    return Optional.empty();
+  }
+
+  /**
+   * Extracts the version string for time travel from the given options.
+   */
+  default Optional<String> extractTimeTravelVersion(CaseInsensitiveStringMap options) {
+    return Optional.empty();
+  }
 }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 4163d86..d7a45f6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -95,6 +95,36 @@ public interface TableCatalog extends CatalogPlugin {
   Table loadTable(Identifier ident) throws NoSuchTableException;
 
   /**
+   * Load table metadata of a specific version by {@link Identifier identifier} from the catalog.
+   * <p>
+   * If the catalog supports views and contains a view for the identifier and not a table, this
+   * must throw {@link NoSuchTableException}.
+   *
+   * @param ident a table identifier
+   * @param version version of the table
+   * @return the table's metadata
+   * @throws NoSuchTableException If the table doesn't exist or is a view
+   */
+  default Table loadTable(Identifier ident, String version) throws NoSuchTableException {
+    throw new UnsupportedOperationException("Load table with version is not supported.");
+  }
+
+  /**
+   * Load table metadata at a specific time by {@link Identifier identifier} from the catalog.
+   * <p>
+   * If the catalog supports views and contains a view for the identifier and not a table, this
+   * must throw {@link NoSuchTableException}.
+   *
+   * @param ident a table identifier
+   * @param timestamp timestamp of the table, which is microseconds since 1970-01-01 00:00:00 UTC
+   * @return the table's metadata
+   * @throws NoSuchTableException If the table doesn't exist or is a view
+   */
+  default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
+    throw new UnsupportedOperationException("Load table with timestamp is not supported.");
+  }
+
+  /**
    * Invalidate cached table metadata for an {@link Identifier identifier}.
    * <p>
    * If the table is already loaded or cached, drop cached data. If the table does not exist or is
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 26d206f..4186a5b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1186,8 +1186,8 @@ class Analyzer(override val catalogManager: CatalogManager)
                 newRelation.copyTagsFrom(multi)
                 newRelation
             }).orElse {
-              val loaded = createRelation(
-                catalog, ident, CatalogV2Util.loadTable(catalog, ident), u.options, u.isStreaming)
+              val table = CatalogV2Util.loadTable(catalog, ident, u.timeTravelSpec)
+              val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming)
               loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
               loaded
             }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index a67d85d..ec3d957 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -224,7 +224,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
       alwaysInline: Boolean,
       cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan =
     plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
-      case u @ UnresolvedRelation(Seq(table), _, _) =>
+      case u @ UnresolvedRelation(Seq(table), _, _, _) =>
         cteRelations.find(r => plan.conf.resolver(r._1, table)).map { case (_, d) =>
           if (alwaysInline) {
             d.child
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index 27f2a5f..10d8d39 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -106,7 +106,7 @@ object ResolveHints {
 
       val newNode = CurrentOrigin.withOrigin(plan.origin) {
         plan match {
-          case ResolvedHint(u @ UnresolvedRelation(ident, _, _), hint)
+          case ResolvedHint(u @ UnresolvedRelation(ident, _, _, _), hint)
               if matchedIdentifierInHint(ident) =>
             ResolvedHint(u, createHintInfo(hintName).merge(hint, hintErrorHandler))
 
@@ -114,7 +114,7 @@ object ResolveHints {
               if matchedIdentifierInHint(extractIdentifier(r)) =>
             ResolvedHint(r, createHintInfo(hintName).merge(hint, hintErrorHandler))
 
-          case UnresolvedRelation(ident, _, _) if matchedIdentifierInHint(ident) =>
+          case UnresolvedRelation(ident, _, _, _) if matchedIdentifierInHint(ident) =>
             ResolvedHint(plan, createHintInfo(hintName))
 
           case r: SubqueryAlias if matchedIdentifierInHint(extractIdentifier(r)) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 0785336..af6837a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.connector.expressions.TimeTravelSpec
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.types.{DataType, Metadata, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -45,7 +46,8 @@ class UnresolvedException(function: String)
 case class UnresolvedRelation(
     multipartIdentifier: Seq[String],
     options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
-    override val isStreaming: Boolean = false)
+    override val isStreaming: Boolean = false,
+    timeTravelSpec: Option[TimeTravelSpec] = None)
   extends LeafNode with NamedRelation {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
@@ -65,9 +67,13 @@ object UnresolvedRelation {
   def apply(
       tableIdentifier: TableIdentifier,
       extraOptions: CaseInsensitiveStringMap,
-      isStreaming: Boolean): UnresolvedRelation = {
+      isStreaming: Boolean,
+      timeTravelSpec: Option[TimeTravelSpec]): UnresolvedRelation = {
     UnresolvedRelation(
-      tableIdentifier.database.toSeq :+ tableIdentifier.table, extraOptions, isStreaming)
+      tableIdentifier.database.toSeq :+ tableIdentifier.table,
+      extraOptions,
+      isStreaming,
+      timeTravelSpec)
   }
 
   def apply(tableIdentifier: TableIdentifier): UnresolvedRelation =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 71cd8b2..754f92b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, Inte
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone}
 import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
-import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
+import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, TimeTravelSpec, Transform, YearsTransform}
 import org.apache.spark.sql.errors.QueryParsingErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -1257,7 +1257,17 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
    */
   override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
     val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
-    val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId))
+    val timeTravel = if (ctx.temporalClause != null) {
+      val v = ctx.temporalClause.version
+      val version =
+        if (ctx.temporalClause.INTEGER_VALUE != null) Some(v.getText) else Option(v).map(string)
+      TimeTravelSpec.create(Option(ctx.temporalClause.timestamp).map(string), version)
+    } else {
+      None
+    }
+
+    val table = mayApplyAliasPlan(ctx.tableAlias,
+      UnresolvedRelation(tableId, timeTravelSpec = timeTravel))
     table.optionalMap(ctx.sample)(withSample)
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 69625a1..c010b6d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
 import org.apache.spark.sql.connector.catalog.TableChange._
+import org.apache.spark.sql.connector.expressions.{AsOfTimestamp, AsOfVersion, TimeTravelSpec}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -276,9 +277,21 @@ private[sql] object CatalogV2Util {
     new StructType(newFields)
   }
 
-  def loadTable(catalog: CatalogPlugin, ident: Identifier): Option[Table] =
+  def loadTable(
+      catalog: CatalogPlugin,
+      ident: Identifier,
+      timeTravelSpec: Option[TimeTravelSpec] = None): Option[Table] =
     try {
-      Option(catalog.asTableCatalog.loadTable(ident))
+      if (timeTravelSpec.nonEmpty) {
+        timeTravelSpec.get match {
+          case v: AsOfVersion =>
+            Option(catalog.asTableCatalog.loadTable(ident, v.version))
+          case ts: AsOfTimestamp =>
+            Option(catalog.asTableCatalog.loadTable(ident, ts.timestamp))
+        }
+      } else {
+        Option(catalog.asTableCatalog.loadTable(ident))
+      }
     } catch {
       case _: NoSuchTableException => None
       case _: NoSuchDatabaseException => None
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
index 2863d94..32fe208 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
@@ -19,7 +19,11 @@ package org.apache.spark.sql.connector.expressions
 
 import org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Helper methods for working with the logical expressions API.
@@ -357,3 +361,25 @@ private[sql] object SortValue {
       None
   }
 }
+
+private[sql] sealed trait TimeTravelSpec
+
+private[sql] case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec
+private[sql] case class AsOfVersion(version: String) extends TimeTravelSpec
+
+private[sql] object TimeTravelSpec {
+  def create(timestamp: Option[String], version: Option[String]) : Option[TimeTravelSpec] = {
+    if (timestamp.nonEmpty && version.nonEmpty) {
+      throw QueryCompilationErrors.invalidTimeTravelSpecError()
+    } else if (timestamp.nonEmpty) {
+      val ts = DateTimeUtils.stringToTimestampAnsi(
+        UTF8String.fromString(timestamp.get),
+        DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+      Some(AsOfTimestamp(ts))
+    } else if (version.nonEmpty) {
+      Some(AsOfVersion(version.get))
+    } else {
+      None
+    }
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index fe1c358..207a9c3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2371,4 +2371,8 @@ object QueryCompilationErrors {
     new AnalysisException(
       s"Invalid view text: $viewText. The view $tableName may have been tampered with")
   }
+
+  def invalidTimeTravelSpecError(): Throwable = {
+    new AnalysisException("Cannot specify both version and timestamp when scanning the table.")
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 572fccf..cea1fdf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.parser
 
+import java.time.DateTimeException
+import java.util
 import java.util.Locale
 
 import org.apache.spark.sql.AnalysisException
@@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
-import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
+import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, TimeTravelSpec, Transform, YearsTransform}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 class DDLParserSuite extends AnalysisTest {
@@ -2438,4 +2441,67 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(parsePlan(timestampTypeSql), insertPartitionPlan(timestamp))
     comparePlans(parsePlan(binaryTypeSql), insertPartitionPlan(binaryStr))
   }
+
+  test("as of syntax") {
+    val properties = new util.HashMap[String, String]
+    var timeTravel = TimeTravelSpec.create(None, Some("Snapshot123456789"))
+    comparePlans(
+      parsePlan("SELECT * FROM a.b.c VERSION AS OF 'Snapshot123456789'"),
+      Project(Seq(UnresolvedStar(None)),
+        UnresolvedRelation(
+          Seq("a", "b", "c"),
+          new CaseInsensitiveStringMap(properties),
+          timeTravelSpec = timeTravel)))
+
+    timeTravel = TimeTravelSpec.create(None, Some("123456789"))
+    comparePlans(
+      parsePlan("SELECT * FROM a.b.c FOR SYSTEM_VERSION AS OF 123456789"),
+      Project(Seq(UnresolvedStar(None)),
+        UnresolvedRelation(
+          Seq("a", "b", "c"),
+          new CaseInsensitiveStringMap(properties),
+          timeTravelSpec = timeTravel)))
+
+    timeTravel = TimeTravelSpec.create(Some("2019-01-29 00:37:58"), None)
+    comparePlans(
+      parsePlan("SELECT * FROM a.b.c TIMESTAMP AS OF '2019-01-29 00:37:58'"),
+      Project(Seq(UnresolvedStar(None)),
+        UnresolvedRelation(
+          Seq("a", "b", "c"),
+          new CaseInsensitiveStringMap(properties),
+          timeTravelSpec = timeTravel)))
+    comparePlans(
+      parsePlan("SELECT * FROM a.b.c FOR SYSTEM_TIME AS OF '2019-01-29 00:37:58'"),
+      Project(Seq(UnresolvedStar(None)),
+        UnresolvedRelation(
+          Seq("a", "b", "c"),
+          new CaseInsensitiveStringMap(properties),
+          timeTravelSpec = timeTravel)))
+
+    timeTravel = TimeTravelSpec.create(Some("2019-01-29"), None)
+    comparePlans(
+      parsePlan("SELECT * FROM a.b.c TIMESTAMP AS OF '2019-01-29'"),
+      Project(Seq(UnresolvedStar(None)),
+        UnresolvedRelation(
+          Seq("a", "b", "c"),
+          new CaseInsensitiveStringMap(properties),
+          timeTravelSpec = timeTravel)))
+    comparePlans(
+      parsePlan("SELECT * FROM a.b.c FOR SYSTEM_TIME AS OF '2019-01-29'"),
+      Project(Seq(UnresolvedStar(None)),
+        UnresolvedRelation(
+          Seq("a", "b", "c"),
+          new CaseInsensitiveStringMap(properties),
+          timeTravelSpec = timeTravel)))
+
+    val e1 = intercept[DateTimeException] {
+      parsePlan("SELECT * FROM a.b.c TIMESTAMP AS OF '2019-01-11111'")
+    }.getMessage
+    assert(e1.contains("Cannot cast 2019-01-11111 to TimestampType."))
+
+    val e2 = intercept[AnalysisException] {
+      timeTravel = TimeTravelSpec.create(Some("2019-01-29 00:37:58"), Some("123456789"))
+    }.getMessage
+    assert(e2.contains("Cannot specify both version and timestamp when scanning the table."))
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 0c403ba..d8e6bc4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -60,6 +60,26 @@ class BasicInMemoryTableCatalog extends TableCatalog {
     }
   }
 
+  override def loadTable(ident: Identifier, version: String): Table = {
+    val versionIdent = Identifier.of(ident.namespace, ident.name + version)
+    Option(tables.get(versionIdent)) match {
+      case Some(table) =>
+        table
+      case _ =>
+        throw new NoSuchTableException(ident)
+    }
+  }
+
+  override def loadTable(ident: Identifier, timestamp: Long): Table = {
+    val timestampIdent = Identifier.of(ident.namespace, ident.name + timestamp)
+    Option(tables.get(timestampIdent)) match {
+      case Some(table) =>
+        table
+      case _ =>
+        throw new NoSuchTableException(ident)
+    }
+  }
+
   override def invalidateTable(ident: Identifier): Unit = {
     invalidatedTables.add(ident)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index a0874fc..eeb12c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
 import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.expressions.TimeTravelSpec
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -117,7 +118,14 @@ private[sql] object DataSourceV2Utils extends Logging {
           hasCatalog,
           catalogManager,
           dsOptions)
-        (catalog.loadTable(ident), Some(catalog), Some(ident))
+
+        val version = hasCatalog.extractTimeTravelVersion(dsOptions)
+        val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions)
+
+        val timeTravelVersion = if (version.isPresent) Some(version.get) else None
+        val timeTravelTimestamp = if (timestamp.isPresent) Some(timestamp.get) else None
+        val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, timeTravelVersion)
+        (CatalogV2Util.loadTable(catalog, ident, timeTravel).get, Some(catalog), Some(ident))
       case _ =>
         // TODO: Non-catalog paths for DSV2 are currently not well defined.
         val tbl = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 9699685..7bdcad0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
@@ -2948,6 +2949,58 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("Mock time travel test") {
+    sql("use testcat")
+    val t1 = "testcat.tSnapshot123456789"
+    val t2 = "testcat.t2345678910"
+    withTable(t1, t2) {
+      sql(s"CREATE TABLE $t1 (id int) USING foo")
+      sql(s"CREATE TABLE $t2 (id int) USING foo")
+
+      sql(s"INSERT INTO $t1 VALUES (1)")
+      sql(s"INSERT INTO $t1 VALUES (2)")
+      sql(s"INSERT INTO $t2 VALUES (3)")
+      sql(s"INSERT INTO $t2 VALUES (4)")
+
+      assert(sql("SELECT * FROM t VERSION AS OF 'Snapshot123456789'").collect
+        === Array(Row(1), Row(2)))
+      assert(sql("SELECT * FROM t VERSION AS OF 2345678910").collect
+        === Array(Row(3), Row(4)))
+      assert(sql("SELECT * FROM t FOR SYSTEM_VERSION AS OF 'Snapshot123456789'").collect
+        === Array(Row(1), Row(2)))
+      assert(sql("SELECT * FROM t FOR SYSTEM_VERSION AS OF 2345678910").collect
+        === Array(Row(3), Row(4)))
+    }
+
+    val ts1 = DateTimeUtils.stringToTimestampAnsi(
+      UTF8String.fromString("2019-01-29 00:37:58"),
+      DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+    val ts2 = DateTimeUtils.stringToTimestampAnsi(
+      UTF8String.fromString("2021-01-29 00:37:58"),
+      DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+    val t3 = s"testcat.t$ts1"
+    val t4 = s"testcat.t$ts2"
+
+    withTable(t3, t4) {
+      sql(s"CREATE TABLE $t3 (id int) USING foo")
+      sql(s"CREATE TABLE $t4 (id int) USING foo")
+
+      sql(s"INSERT INTO $t3 VALUES (5)")
+      sql(s"INSERT INTO $t3 VALUES (6)")
+      sql(s"INSERT INTO $t4 VALUES (7)")
+      sql(s"INSERT INTO $t4 VALUES (8)")
+
+      assert(sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29 00:37:58'").collect
+        === Array(Row(5), Row(6)))
+      assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 00:37:58'").collect
+        === Array(Row(7), Row(8)))
+      assert(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF '2019-01-29 00:37:58'").collect
+        === Array(Row(5), Row(6)))
+      assert(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF '2021-01-29 00:37:58'").collect
+        === Array(Row(7), Row(8)))
+    }
+  }
+
   private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
     val e = intercept[AnalysisException] {
       sql(s"$sqlCommand $sqlParams")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index 076dad7..3840dd3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -17,24 +17,29 @@
 
 package org.apache.spark.sql.connector
 
+import java.util.Optional
+
 import scala.language.implicitConversions
 import scala.util.Try
 
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, SupportsCatalogOptions, TableCatalog}
 import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{LongType, StructType}
 import org.apache.spark.sql.util.{CaseInsensitiveStringMap, QueryExecutionListener}
+import org.apache.spark.unsafe.types.UTF8String
 
 class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with BeforeAndAfter {
 
@@ -271,6 +276,61 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
     }
   }
 
+  test("mock time travel test") {
+    val t1 = s"$catalogName.tSnapshot123456789"
+    val t2 = s"$catalogName.t2345678910"
+    withTable(t1, t2) {
+      sql(s"create table $t1 (id bigint) using $format")
+      sql(s"create table $t2 (id bigint) using $format")
+
+      val df1 = spark.range(10)
+      df1.write.format(format).option("name", "tSnapshot123456789").option("catalog", catalogName)
+        .mode(SaveMode.Append).save()
+
+      val df2 = spark.range(10, 20)
+      df2.write.format(format).option("name", "t2345678910").option("catalog", catalogName)
+        .mode(SaveMode.Overwrite).save()
+
+      // load with version
+      checkAnswer(load("t", Some(catalogName), version = Some("Snapshot123456789")), df1.toDF())
+      checkAnswer(load("t", Some(catalogName), version = Some("2345678910")), df2.toDF())
+    }
+
+    val ts1 = DateTimeUtils.stringToTimestampAnsi(
+      UTF8String.fromString("2019-01-29 00:37:58"),
+      DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+    val ts2 = DateTimeUtils.stringToTimestampAnsi(
+      UTF8String.fromString("2021-01-29 00:37:58"),
+      DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+    val t3 = s"$catalogName.t$ts1"
+    val t4 = s"$catalogName.t$ts2"
+    withTable(t3, t4) {
+      sql(s"create table $t3 (id bigint) using $format")
+      sql(s"create table $t4 (id bigint) using $format")
+
+      val df3 = spark.range(30, 40)
+      df3.write.format(format).option("name", s"t$ts1").option("catalog", catalogName)
+        .mode(SaveMode.Append).save()
+
+      val df4 = spark.range(50, 60)
+      df4.write.format(format).option("name", s"t$ts2").option("catalog", catalogName)
+        .mode(SaveMode.Overwrite).save()
+
+      // load with timestamp
+      checkAnswer(load("t", Some(catalogName), version = None,
+        timestamp = Some("2019-01-29 00:37:58")), df3.toDF())
+      checkAnswer(load("t", Some(catalogName), version = None,
+        timestamp = Some("2021-01-29 00:37:58")), df4.toDF())
+    }
+
+    val e = intercept[AnalysisException] {
+      load("t", Some(catalogName), version = Some("12345678"),
+        timestamp = Some("2019-01-29 00:37:58"))
+    }
+    assert(e.getMessage
+      .contains("Cannot specify both version and timestamp when scanning the table."))
+  }
+
   private def checkV2Identifiers(
       plan: LogicalPlan,
       identifier: String = "t1",
@@ -281,9 +341,19 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
     assert(v2.catalog.exists(_ == catalogPlugin))
   }
 
-  private def load(name: String, catalogOpt: Option[String]): DataFrame = {
+  private def load(
+      name: String,
+      catalogOpt: Option[String],
+      version: Option[String] = None,
+      timestamp: Option[String] = None): DataFrame = {
     val dfr = spark.read.format(format).option("name", name)
     catalogOpt.foreach(cName => dfr.option("catalog", cName))
+    if (version.nonEmpty) {
+      dfr.option("versionAsOf", version.get)
+    }
+    if (timestamp.nonEmpty) {
+      dfr.option("timestampAsOf", timestamp.get)
+    }
     dfr.load()
   }
 
@@ -312,4 +382,20 @@ class CatalogSupportingInMemoryTableProvider
   override def extractCatalog(options: CaseInsensitiveStringMap): String = {
     options.get("catalog")
   }
+
+  override def extractTimeTravelVersion(options: CaseInsensitiveStringMap): Optional[String] = {
+    if (options.get("versionAsOf") != null) {
+      Optional.of(options.get("versionAsOf"))
+    } else {
+      Optional.empty[String]
+    }
+  }
+
+  override def extractTimeTravelTimestamp(options: CaseInsensitiveStringMap): Optional[String] = {
+    if (options.get("timestampAsOf") != null) {
+      Optional.of(options.get("timestampAsOf"))
+    } else {
+      Optional.empty[String]
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 3769de0..10cc55b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -599,7 +599,7 @@ private[hive] class TestHiveQueryExecution(
   override lazy val analyzed: LogicalPlan = sparkSession.withActive {
     // Make sure any test tables referenced are loaded.
     val referencedTables = logical.collect {
-      case UnresolvedRelation(ident, _, _) =>
+      case UnresolvedRelation(ident, _, _, _) =>
         if (ident.length > 1 && ident.head.equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)) {
           ident.tail.asTableIdentifier
         } else ident.asTableIdentifier

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org