You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/22 09:24:33 UTC

[incubator-paimon] branch master updated: [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF) (#659)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b0552563e [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF) (#659)
b0552563e is described below

commit b0552563e245305e4ea7fe79d4c14990426369e5
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Wed Mar 22 17:24:28 2023 +0800

    [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF) (#659)
---
 docs/content/how-to/querying-tables.md             |  38 +++++
 .../main/java/org/apache/paimon/CoreOptions.java   |   4 +
 .../paimon/table/AbstractFileStoreTable.java       |  35 +++-
 .../StaticFromTimestampStartingScanner.java        |  16 +-
 .../org/apache/paimon/utils/SnapshotManager.java   |  12 +-
 .../paimon/hive/mapred/PaimonInputFormat.java      |   9 +-
 paimon-spark/paimon-spark-3.3/pom.xml              |   2 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java |  75 ++++++++-
 .../org/apache/paimon/spark/SparkCatalogBase.java  |  38 +++--
 .../java/org/apache/paimon/spark/SparkScan.java    |   6 +-
 .../org/apache/paimon/spark/SparkReadTestBase.java |   2 +-
 .../apache/paimon/spark/SparkTimeTravelITCase.java | 186 +++++++++++++++++++++
 12 files changed, 393 insertions(+), 30 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md
index ca3108ea5..0d2fe6803 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -159,6 +159,44 @@ INSERT INTO paimon_table SELECT * FROM kakfa_table;
 SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
 ```
 
+## Time Travel
+
+Currently, Paimon supports time travel for Flink and Spark 3 (requires Spark 3.3+).
+
+{{< tabs "time-travel-example" >}}
+
+{{< tab "Flink" >}}
+****
+you can use [dynamic table options](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options) to specify scan mode and from where to start:
+
+```sql
+-- travel to snapshot with id 1L
+SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
+
+-- travel to specified timestamp with a long value in milliseconds
+SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
+```
+{{< /tab >}}
+
+{{< tab "Spark3" >}}
+
+you can use `VERSION AS OF` and `TIMESTAMP AS OF` in query to do time travel:
+
+```sql
+-- travel to snapshot with id 1L (use snapshot id as version)
+SELECT * FROM t VERSION AS OF 1;
+
+-- travel to specified timestamp 
+SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
+
+-- you can also use a long value in seconds as timestamp
+SELECT * FROM t TIMESTAMP AS OF 1678883047;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
 ## System Tables
 
 System tables contain metadata and information about each table, such as the snapshots created and the options in use. Users can access system tables with batch queries.
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 6da5090d3..b2832a4aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -965,6 +965,10 @@ public class CoreOptions implements Serializable {
         if (options.contains(SCAN_TIMESTAMP_MILLIS) && !options.contains(SCAN_MODE)) {
             options.set(SCAN_MODE, StartupMode.FROM_TIMESTAMP);
         }
+
+        if (options.contains(SCAN_SNAPSHOT_ID) && !options.contains(SCAN_MODE)) {
+            options.set(SCAN_MODE, StartupMode.FROM_SNAPSHOT);
+        }
     }
 
     public static List<ConfigOption<?>> getOptions() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 9f19331ae..40fbd0178 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -37,6 +38,7 @@ import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.StreamDataTableScanImpl;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReaderImpl;
+import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import org.apache.paimon.utils.SnapshotManager;
 
 import java.util.Map;
@@ -119,12 +121,15 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
         // set dynamic options with default values
         CoreOptions.setDefaultValues(newOptions);
 
-        // copy a new paimon to contain dynamic options
+        // copy a new table schema to contain dynamic options
         TableSchema newTableSchema = tableSchema.copy(newOptions.toMap());
 
-        // validate schema wit new options
+        // validate schema with new options
         SchemaValidation.validateTableSchema(newTableSchema);
 
+        // see if merged options contain time travel option
+        newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
+
         return copy(newTableSchema);
     }
 
@@ -179,4 +184,30 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
                 options().writeOnly() ? null : store().newExpire(),
                 options().writeOnly() ? null : store().newPartitionExpire(commitUser));
     }
+
+    private Optional<TableSchema> tryTimeTravel(Options options) {
+        CoreOptions coreOptions = new CoreOptions(options);
+        Long snapshotId;
+
+        switch (coreOptions.startupMode()) {
+            case FROM_SNAPSHOT:
+                snapshotId = coreOptions.scanSnapshotId();
+                if (snapshotManager().snapshotExists(snapshotId)) {
+                    long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
+                    return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
+                }
+                return Optional.empty();
+            case FROM_TIMESTAMP:
+                Snapshot snapshot =
+                        StaticFromTimestampStartingScanner.getSnapshot(
+                                snapshotManager(), coreOptions.scanTimestampMills());
+                if (snapshot != null) {
+                    long schemaId = snapshot.schemaId();
+                    return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
+                }
+                return Optional.empty();
+            default:
+                return Optional.empty();
+        }
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index e20aea4f0..037a9ce5e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
@@ -26,6 +27,8 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 /**
  * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
  * batch read.
@@ -44,18 +47,23 @@ public class StaticFromTimestampStartingScanner implements StartingScanner {
     @Override
     public DataTableScan.DataFilePlan getPlan(
             SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
-        Long startingSnapshotId = snapshotManager.earlierOrEqualTimeMills(startupMillis);
-        if (startingSnapshotId == null) {
+        Snapshot startingSnapshot = getSnapshot(snapshotManager, startupMillis);
+        if (startingSnapshot == null) {
             LOG.debug(
                     "There is currently no snapshot earlier than or equal to timestamp[{}]",
                     startupMillis);
             return null;
         }
         return new DataTableScan.DataFilePlan(
-                startingSnapshotId,
+                startingSnapshot.id(),
                 snapshotSplitReader
                         .withKind(ScanKind.ALL)
-                        .withSnapshot(startingSnapshotId)
+                        .withSnapshot(startingSnapshot.id())
                         .splits());
     }
+
+    @Nullable
+    public static Snapshot getSnapshot(SnapshotManager snapshotManager, long timestamp) {
+        return snapshotManager.earlierOrEqualTimeMills(timestamp);
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index c0bf6a73f..a8388a740 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -135,8 +135,11 @@ public class SnapshotManager implements Serializable {
         return earliest - 1;
     }
 
-    /** Returns a snapshot earlier than or equals to the timestamp mills. */
-    public @Nullable Long earlierOrEqualTimeMills(long timestampMills) {
+    /**
+     * Returns a {@link Snapshot} whoes commit time is earlier than or equal to given timestamp
+     * mills. If there is no such a snapshot, returns null.
+     */
+    public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
         Long earliest = earliestSnapshotId();
         Long latest = latestSnapshotId();
         if (earliest == null || latest == null) {
@@ -144,9 +147,10 @@ public class SnapshotManager implements Serializable {
         }
 
         for (long i = latest; i >= earliest; i--) {
-            long commitTime = snapshot(i).timeMillis();
+            Snapshot snapshot = snapshot(i);
+            long commitTime = snapshot.timeMillis();
             if (commitTime <= timestampMills) {
-                return i;
+                return snapshot;
             }
         }
         return null;
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index f707ba843..98ca11288 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
 
@@ -42,6 +43,8 @@ import org.apache.hadoop.mapred.Reporter;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -55,7 +58,11 @@ public class PaimonInputFormat implements InputFormat<Void, RowDataContainer> {
         FileStoreTable table = createFileStoreTable(jobConf);
         DataTableScan scan = table.newScan();
         createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
-        return scan.plan().splits.stream()
+
+        // TODO: Roll back modification after refactoring scan interface
+        DataTableScan.DataFilePlan plan = scan.plan();
+        List<DataSplit> splits = plan == null ? Collections.emptyList() : plan.splits;
+        return splits.stream()
                 .map(split -> new PaimonInputSplit(table.location().toString(), split))
                 .toArray(PaimonInputSplit[]::new);
     }
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml b/paimon-spark/paimon-spark-3.3/pom.xml
index de3c1a6c3..2cbb8e9c6 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -32,7 +32,7 @@ under the License.
     <name>Paimon : Spark : 3.3</name>
 
     <properties>
-        <spark.version>3.3.0</spark.version>
+        <spark.version>3.3.2</spark.version>
     </properties>
 
     <dependencies>
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index ab03200fe..d59abcdec 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -18,7 +18,80 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Spark {@link TableCatalog} for paimon. */
-public class SparkCatalog extends SparkCatalogBase {}
+public class SparkCatalog extends SparkCatalogBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class);
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain compatibility with Spark 3.2-.
+     */
+    public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException {
+        Table table = loadAndCheck(ident);
+        long snapshotId;
+
+        try {
+            snapshotId = Long.parseUnsignedLong(version);
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Version for time travel should be a LONG value representing snapshot id but was '%s'.",
+                            version),
+                    e);
+        }
+
+        LOG.info("Time travel target snapshot id is {}.", snapshotId);
+
+        Options dynamicOptions = new Options().set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId);
+        return new SparkTable(
+                table.copy(dynamicOptions.toMap()),
+                Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain compatibility with Spark 3.2-.
+     *
+     * <p>NOTE: Time unit of timestamp here is microsecond (see {@link
+     * TableCatalog#loadTable(Identifier, long)}). But in SQL you should use seconds.
+     */
+    public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
+        Table table = loadAndCheck(ident);
+        // Paimon's timestamp use millisecond
+        timestamp = timestamp / 1000;
+
+        LOG.info("Time travel target timestamp is {} milliseconds.", timestamp);
+
+        Options option = new Options().set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp);
+        return new SparkTable(
+                table.copy(option.toMap()),
+                Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
+    }
+
+    private Table loadAndCheck(Identifier ident) throws NoSuchTableException {
+        try {
+            Table table = load(ident);
+            if (!(table instanceof DataTable)) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Only DataTable supports time travel but given table type is '%s'.",
+                                table.getClass().getName()));
+            }
+            return table;
+        } catch (Catalog.TableNotExistException e) {
+            throw new NoSuchTableException(ident);
+        }
+    }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
index 88d65cf1a..f1c903ca7 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
@@ -69,7 +69,7 @@ public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespac
     private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
 
     private String name = null;
-    private Catalog catalog = null;
+    protected Catalog catalog = null;
 
     @Override
     public void initialize(String name, CaseInsensitiveStringMap options) {
@@ -205,7 +205,7 @@ public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespac
     public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
         try {
             return new SparkTable(
-                    catalog.getTable(toIdentifier(ident)),
+                    load(ident),
                     Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
         } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
@@ -360,7 +360,21 @@ public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespac
         return namespace.length == 1;
     }
 
-    private org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident)
+    @Override
+    public void renameTable(Identifier oldIdent, Identifier newIdent)
+            throws NoSuchTableException, TableAlreadyExistsException {
+        try {
+            catalog.renameTable(toIdentifier(oldIdent), toIdentifier(newIdent), false);
+        } catch (Catalog.TableNotExistException e) {
+            throw new NoSuchTableException(oldIdent);
+        } catch (Catalog.TableAlreadyExistException e) {
+            throw new TableAlreadyExistsException(newIdent);
+        }
+    }
+
+    // --------------------- tools ------------------------------------------
+
+    protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident)
             throws NoSuchTableException {
         if (!isValidateNamespace(ident.namespace())) {
             throw new NoSuchTableException(ident);
@@ -369,22 +383,16 @@ public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespac
         return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name());
     }
 
+    /** Load a Table Store table. */
+    protected org.apache.paimon.table.Table load(Identifier ident)
+            throws Catalog.TableNotExistException, NoSuchTableException {
+        return catalog.getTable(toIdentifier(ident));
+    }
+
     // --------------------- unsupported methods ----------------------------
 
     @Override
     public void alterNamespace(String[] namespace, NamespaceChange... changes) {
         throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
     }
-
-    @Override
-    public void renameTable(Identifier oldIdent, Identifier newIdent)
-            throws NoSuchTableException, TableAlreadyExistsException {
-        try {
-            catalog.renameTable(toIdentifier(oldIdent), toIdentifier(newIdent), false);
-        } catch (Catalog.TableNotExistException e) {
-            throw new NoSuchTableException(oldIdent);
-        } catch (Catalog.TableAlreadyExistException e) {
-            throw new TableAlreadyExistsException(newIdent);
-        }
-    }
 }
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index d05081fa0..ecf0c6de2 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -20,6 +20,7 @@ package org.apache.paimon.spark;
 
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
 
 import org.apache.spark.sql.connector.read.Batch;
 import org.apache.spark.sql.connector.read.InputPartition;
@@ -29,6 +30,7 @@ import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.connector.read.SupportsReportStatistics;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.OptionalLong;
 
@@ -77,7 +79,9 @@ public class SparkScan implements Scan, SupportsReportStatistics {
 
     protected List<Split> splits() {
         if (splits == null) {
-            this.splits = readBuilder.newScan().plan().splits();
+            // TODO: Roll back modification after refactoring scan interface
+            TableScan.Plan plan = readBuilder.newScan().plan();
+            splits = plan == null ? Collections.emptyList() : plan.splits();
         }
         return splits;
     }
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 0519dfb5b..3a3cb0b52 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -168,7 +168,7 @@ public abstract class SparkReadTestBase {
                         tableName));
     }
 
-    private static void writeTable(String tableName, GenericRow... rows) throws Exception {
+    protected static void writeTable(String tableName, GenericRow... rows) throws Exception {
         FileStoreTable fileStoreTable =
                 FileStoreTableFactory.create(
                         LocalFileIO.create(),
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
new file mode 100644
index 000000000..e81d9ba91
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for Spark 3.3+ time travel syntax (VERSION AS OF, TIMESTAMP AS OF). */
+public class SparkTimeTravelITCase extends SparkReadTestBase {
+    @Test
+    public void testTravelToVersion() throws Exception {
+        spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+        // snapshot 1
+        writeTable(
+                "t",
+                GenericRow.of(1, BinaryString.fromString("Hello")),
+                GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+        // snapshot 2
+        writeTable(
+                "t",
+                GenericRow.of(3, BinaryString.fromString("Test")),
+                GenericRow.of(4, BinaryString.fromString("Case")));
+
+        assertThat(spark.sql("SELECT * FROM t").collectAsList().toString())
+                .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
+
+        // time travel to snapshot 1
+        assertThat(spark.sql("SELECT * FROM t VERSION AS OF 1").collectAsList().toString())
+                .isEqualTo("[[1,Hello], [2,Paimon]]");
+    }
+
+    @Test
+    public void testTravelToTimestampString() throws Exception {
+        spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+        // snapshot 1
+        writeTable(
+                "t",
+                GenericRow.of(1, BinaryString.fromString("Hello")),
+                GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+        String anchor = LocalDateTime.now().toString();
+        // Thread.sleep(1000);
+
+        // snapshot 2
+        writeTable(
+                "t",
+                GenericRow.of(3, BinaryString.fromString("Test")),
+                GenericRow.of(4, BinaryString.fromString("Case")));
+
+        assertThat(spark.sql("SELECT * FROM t").collectAsList().toString())
+                .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
+
+        // time travel to snapshot 1
+        assertThat(
+                        spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF '%s'", anchor))
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[1,Hello], [2,Paimon]]");
+    }
+
+    @Test
+    public void testTravelToTimestampNumber() throws Exception {
+        spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+        // snapshot 1
+        writeTable(
+                "t",
+                GenericRow.of(1, BinaryString.fromString("Hello")),
+                GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+        Thread.sleep(1000); // avoid precision problem
+        long anchor = System.currentTimeMillis() / 1000; // convert to seconds
+
+        // snapshot 2
+        writeTable(
+                "t",
+                GenericRow.of(3, BinaryString.fromString("Test")),
+                GenericRow.of(4, BinaryString.fromString("Case")));
+
+        assertThat(spark.sql("SELECT * FROM t").collectAsList().toString())
+                .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
+
+        // time travel to snapshot 1
+        assertThat(
+                        spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF %s", anchor))
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[1,Hello], [2,Paimon]]");
+    }
+
+    @Test
+    public void testTravelToOldSchema() throws Exception {
+        // old schema
+        spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+        // snapshot 1
+        writeTable(
+                "t",
+                GenericRow.of(1, BinaryString.fromString("Hello")),
+                GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+        // new schema
+        spark.sql("ALTER TABLE t ADD COLUMN dt STRING");
+
+        // snapshot 2
+        writeTable(
+                "t",
+                GenericRow.of(3, BinaryString.fromString("Test"), BinaryString.fromString("0401")),
+                GenericRow.of(4, BinaryString.fromString("Case"), BinaryString.fromString("0402")));
+
+        assertThat(spark.sql("SELECT * FROM t").collectAsList().toString())
+                .isEqualTo("[[1,Hello,null], [2,Paimon,null], [3,Test,0401], [4,Case,0402]]");
+
+        // test that cannot see column dt after time travel
+        assertThat(spark.sql("SELECT * FROM t VERSION AS OF 1").collectAsList().toString())
+                .isEqualTo("[[1,Hello], [2,Paimon]]");
+    }
+
+    @Test
+    public void testTravelToNonExistedVersion() {
+        spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+        assertThat(spark.sql("SELECT * FROM t VERSION AS OF 2").collectAsList()).isEmpty();
+    }
+
+    @Test
+    public void testTravelToNonExistedTimestamp() {
+        long anchor = System.currentTimeMillis() / 1000;
+
+        spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+        assertThat(
+                        spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF %s", anchor))
+                                .collectAsList())
+                .isEmpty();
+    }
+
+    @Test
+    public void testIllegalVersionString() {
+        spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+        assertThatThrownBy(() -> spark.sql("SELECT * FROM t VERSION AS OF '1.5'"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Version for time travel should be a LONG value representing snapshot id but was '1.5'."));
+    }
+
+    @Test
+    public void testUnsupportedSystemTableTimeTravel() {
+        spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+        assertThatThrownBy(() -> spark.sql("SELECT * FROM `t$snapshots` VERSION AS OF 1"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Only DataTable supports time travel but given table type is 'org.apache.paimon.table.system.SnapshotsTable'"));
+    }
+}