You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/22 09:24:33 UTC
[incubator-paimon] branch master updated: [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF) (#659)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b0552563e [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF) (#659)
b0552563e is described below
commit b0552563e245305e4ea7fe79d4c14990426369e5
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Wed Mar 22 17:24:28 2023 +0800
[spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF) (#659)
---
docs/content/how-to/querying-tables.md | 38 +++++
.../main/java/org/apache/paimon/CoreOptions.java | 4 +
.../paimon/table/AbstractFileStoreTable.java | 35 +++-
.../StaticFromTimestampStartingScanner.java | 16 +-
.../org/apache/paimon/utils/SnapshotManager.java | 12 +-
.../paimon/hive/mapred/PaimonInputFormat.java | 9 +-
paimon-spark/paimon-spark-3.3/pom.xml | 2 +-
.../java/org/apache/paimon/spark/SparkCatalog.java | 75 ++++++++-
.../org/apache/paimon/spark/SparkCatalogBase.java | 38 +++--
.../java/org/apache/paimon/spark/SparkScan.java | 6 +-
.../org/apache/paimon/spark/SparkReadTestBase.java | 2 +-
.../apache/paimon/spark/SparkTimeTravelITCase.java | 186 +++++++++++++++++++++
12 files changed, 393 insertions(+), 30 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md
index ca3108ea5..0d2fe6803 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -159,6 +159,44 @@ INSERT INTO paimon_table SELECT * FROM kakfa_table;
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
```
+## Time Travel
+
+Currently, Paimon supports time travel for Flink and Spark 3 (requires Spark 3.3+).
+
+{{< tabs "time-travel-example" >}}
+
+{{< tab "Flink" >}}
+****
+you can use [dynamic table options](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options) to specify scan mode and from where to start:
+
+```sql
+-- travel to snapshot with id 1L
+SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
+
+-- travel to specified timestamp with a long value in milliseconds
+SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
+```
+{{< /tab >}}
+
+{{< tab "Spark3" >}}
+
+you can use `VERSION AS OF` and `TIMESTAMP AS OF` in query to do time travel:
+
+```sql
+-- travel to snapshot with id 1L (use snapshot id as version)
+SELECT * FROM t VERSION AS OF 1;
+
+-- travel to specified timestamp
+SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
+
+-- you can also use a long value in seconds as timestamp
+SELECT * FROM t TIMESTAMP AS OF 1678883047;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
## System Tables
System tables contain metadata and information about each table, such as the snapshots created and the options in use. Users can access system tables with batch queries.
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 6da5090d3..b2832a4aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -965,6 +965,10 @@ public class CoreOptions implements Serializable {
if (options.contains(SCAN_TIMESTAMP_MILLIS) && !options.contains(SCAN_MODE)) {
options.set(SCAN_MODE, StartupMode.FROM_TIMESTAMP);
}
+
+ if (options.contains(SCAN_SNAPSHOT_ID) && !options.contains(SCAN_MODE)) {
+ options.set(SCAN_MODE, StartupMode.FROM_SNAPSHOT);
+ }
}
public static List<ConfigOption<?>> getOptions() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 9f19331ae..40fbd0178 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -37,6 +38,7 @@ import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.StreamDataTableScanImpl;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReaderImpl;
+import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.utils.SnapshotManager;
import java.util.Map;
@@ -119,12 +121,15 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
// set dynamic options with default values
CoreOptions.setDefaultValues(newOptions);
- // copy a new paimon to contain dynamic options
+ // copy a new table schema to contain dynamic options
TableSchema newTableSchema = tableSchema.copy(newOptions.toMap());
- // validate schema wit new options
+ // validate schema with new options
SchemaValidation.validateTableSchema(newTableSchema);
+ // see if merged options contain time travel option
+ newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
+
return copy(newTableSchema);
}
@@ -179,4 +184,30 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
options().writeOnly() ? null : store().newExpire(),
options().writeOnly() ? null : store().newPartitionExpire(commitUser));
}
+
+ private Optional<TableSchema> tryTimeTravel(Options options) {
+ CoreOptions coreOptions = new CoreOptions(options);
+ Long snapshotId;
+
+ switch (coreOptions.startupMode()) {
+ case FROM_SNAPSHOT:
+ snapshotId = coreOptions.scanSnapshotId();
+ if (snapshotManager().snapshotExists(snapshotId)) {
+ long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
+ return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
+ }
+ return Optional.empty();
+ case FROM_TIMESTAMP:
+ Snapshot snapshot =
+ StaticFromTimestampStartingScanner.getSnapshot(
+ snapshotManager(), coreOptions.scanTimestampMills());
+ if (snapshot != null) {
+ long schemaId = snapshot.schemaId();
+ return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
+ }
+ return Optional.empty();
+ default:
+ return Optional.empty();
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index e20aea4f0..037a9ce5e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.utils.SnapshotManager;
@@ -26,6 +27,8 @@ import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
/**
* {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
* batch read.
@@ -44,18 +47,23 @@ public class StaticFromTimestampStartingScanner implements StartingScanner {
@Override
public DataTableScan.DataFilePlan getPlan(
SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
- Long startingSnapshotId = snapshotManager.earlierOrEqualTimeMills(startupMillis);
- if (startingSnapshotId == null) {
+ Snapshot startingSnapshot = getSnapshot(snapshotManager, startupMillis);
+ if (startingSnapshot == null) {
LOG.debug(
"There is currently no snapshot earlier than or equal to timestamp[{}]",
startupMillis);
return null;
}
return new DataTableScan.DataFilePlan(
- startingSnapshotId,
+ startingSnapshot.id(),
snapshotSplitReader
.withKind(ScanKind.ALL)
- .withSnapshot(startingSnapshotId)
+ .withSnapshot(startingSnapshot.id())
.splits());
}
+
+ @Nullable
+ public static Snapshot getSnapshot(SnapshotManager snapshotManager, long timestamp) {
+ return snapshotManager.earlierOrEqualTimeMills(timestamp);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index c0bf6a73f..a8388a740 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -135,8 +135,11 @@ public class SnapshotManager implements Serializable {
return earliest - 1;
}
- /** Returns a snapshot earlier than or equals to the timestamp mills. */
- public @Nullable Long earlierOrEqualTimeMills(long timestampMills) {
+ /**
+ * Returns a {@link Snapshot} whoes commit time is earlier than or equal to given timestamp
+ * mills. If there is no such a snapshot, returns null.
+ */
+ public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
if (earliest == null || latest == null) {
@@ -144,9 +147,10 @@ public class SnapshotManager implements Serializable {
}
for (long i = latest; i >= earliest; i--) {
- long commitTime = snapshot(i).timeMillis();
+ Snapshot snapshot = snapshot(i);
+ long commitTime = snapshot.timeMillis();
if (commitTime <= timestampMills) {
- return i;
+ return snapshot;
}
}
return null;
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index f707ba843..98ca11288 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.ReadBuilder;
@@ -42,6 +43,8 @@ import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
/**
@@ -55,7 +58,11 @@ public class PaimonInputFormat implements InputFormat<Void, RowDataContainer> {
FileStoreTable table = createFileStoreTable(jobConf);
DataTableScan scan = table.newScan();
createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
- return scan.plan().splits.stream()
+
+ // TODO: Roll back modification after refactoring scan interface
+ DataTableScan.DataFilePlan plan = scan.plan();
+ List<DataSplit> splits = plan == null ? Collections.emptyList() : plan.splits;
+ return splits.stream()
.map(split -> new PaimonInputSplit(table.location().toString(), split))
.toArray(PaimonInputSplit[]::new);
}
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml b/paimon-spark/paimon-spark-3.3/pom.xml
index de3c1a6c3..2cbb8e9c6 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -32,7 +32,7 @@ under the License.
<name>Paimon : Spark : 3.3</name>
<properties>
- <spark.version>3.3.0</spark.version>
+ <spark.version>3.3.2</spark.version>
</properties>
<dependencies>
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index ab03200fe..d59abcdec 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -18,7 +18,80 @@
package org.apache.paimon.spark;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Spark {@link TableCatalog} for paimon. */
-public class SparkCatalog extends SparkCatalogBase {}
+public class SparkCatalog extends SparkCatalogBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class);
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain compatibility with Spark 3.2-.
+ */
+ public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException {
+ Table table = loadAndCheck(ident);
+ long snapshotId;
+
+ try {
+ snapshotId = Long.parseUnsignedLong(version);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Version for time travel should be a LONG value representing snapshot id but was '%s'.",
+ version),
+ e);
+ }
+
+ LOG.info("Time travel target snapshot id is {}.", snapshotId);
+
+ Options dynamicOptions = new Options().set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId);
+ return new SparkTable(
+ table.copy(dynamicOptions.toMap()),
+ Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain compatibility with Spark 3.2-.
+ *
+ * <p>NOTE: Time unit of timestamp here is microsecond (see {@link
+ * TableCatalog#loadTable(Identifier, long)}). But in SQL you should use seconds.
+ */
+ public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
+ Table table = loadAndCheck(ident);
+ // Paimon's timestamp use millisecond
+ timestamp = timestamp / 1000;
+
+ LOG.info("Time travel target timestamp is {} milliseconds.", timestamp);
+
+ Options option = new Options().set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp);
+ return new SparkTable(
+ table.copy(option.toMap()),
+ Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
+ }
+
+ private Table loadAndCheck(Identifier ident) throws NoSuchTableException {
+ try {
+ Table table = load(ident);
+ if (!(table instanceof DataTable)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Only DataTable supports time travel but given table type is '%s'.",
+ table.getClass().getName()));
+ }
+ return table;
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(ident);
+ }
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
index 88d65cf1a..f1c903ca7 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
@@ -69,7 +69,7 @@ public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespac
private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
private String name = null;
- private Catalog catalog = null;
+ protected Catalog catalog = null;
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
@@ -205,7 +205,7 @@ public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespac
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
try {
return new SparkTable(
- catalog.getTable(toIdentifier(ident)),
+ load(ident),
Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
@@ -360,7 +360,21 @@ public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespac
return namespace.length == 1;
}
- private org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident)
+ @Override
+ public void renameTable(Identifier oldIdent, Identifier newIdent)
+ throws NoSuchTableException, TableAlreadyExistsException {
+ try {
+ catalog.renameTable(toIdentifier(oldIdent), toIdentifier(newIdent), false);
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(oldIdent);
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new TableAlreadyExistsException(newIdent);
+ }
+ }
+
+ // --------------------- tools ------------------------------------------
+
+ protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident)
throws NoSuchTableException {
if (!isValidateNamespace(ident.namespace())) {
throw new NoSuchTableException(ident);
@@ -369,22 +383,16 @@ public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespac
return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name());
}
+ /** Load a Table Store table. */
+ protected org.apache.paimon.table.Table load(Identifier ident)
+ throws Catalog.TableNotExistException, NoSuchTableException {
+ return catalog.getTable(toIdentifier(ident));
+ }
+
// --------------------- unsupported methods ----------------------------
@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes) {
throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
}
-
- @Override
- public void renameTable(Identifier oldIdent, Identifier newIdent)
- throws NoSuchTableException, TableAlreadyExistsException {
- try {
- catalog.renameTable(toIdentifier(oldIdent), toIdentifier(newIdent), false);
- } catch (Catalog.TableNotExistException e) {
- throw new NoSuchTableException(oldIdent);
- } catch (Catalog.TableAlreadyExistException e) {
- throw new TableAlreadyExistsException(newIdent);
- }
- }
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index d05081fa0..ecf0c6de2 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -20,6 +20,7 @@ package org.apache.paimon.spark;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
@@ -29,6 +30,7 @@ import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.types.StructType;
+import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
@@ -77,7 +79,9 @@ public class SparkScan implements Scan, SupportsReportStatistics {
protected List<Split> splits() {
if (splits == null) {
- this.splits = readBuilder.newScan().plan().splits();
+ // TODO: Roll back modification after refactoring scan interface
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ splits = plan == null ? Collections.emptyList() : plan.splits();
}
return splits;
}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 0519dfb5b..3a3cb0b52 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -168,7 +168,7 @@ public abstract class SparkReadTestBase {
tableName));
}
- private static void writeTable(String tableName, GenericRow... rows) throws Exception {
+ protected static void writeTable(String tableName, GenericRow... rows) throws Exception {
FileStoreTable fileStoreTable =
FileStoreTableFactory.create(
LocalFileIO.create(),
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
new file mode 100644
index 000000000..e81d9ba91
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for Spark 3.3+ time travel syntax (VERSION AS OF, TIMESTAMP AS OF). */
+public class SparkTimeTravelITCase extends SparkReadTestBase {
+ @Test
+ public void testTravelToVersion() throws Exception {
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ writeTable(
+ "t",
+ GenericRow.of(1, BinaryString.fromString("Hello")),
+ GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+ // snapshot 2
+ writeTable(
+ "t",
+ GenericRow.of(3, BinaryString.fromString("Test")),
+ GenericRow.of(4, BinaryString.fromString("Case")));
+
+ assertThat(spark.sql("SELECT * FROM t").collectAsList().toString())
+ .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
+
+ // time travel to snapshot 1
+ assertThat(spark.sql("SELECT * FROM t VERSION AS OF 1").collectAsList().toString())
+ .isEqualTo("[[1,Hello], [2,Paimon]]");
+ }
+
+ @Test
+ public void testTravelToTimestampString() throws Exception {
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ writeTable(
+ "t",
+ GenericRow.of(1, BinaryString.fromString("Hello")),
+ GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+ String anchor = LocalDateTime.now().toString();
+ // Thread.sleep(1000);
+
+ // snapshot 2
+ writeTable(
+ "t",
+ GenericRow.of(3, BinaryString.fromString("Test")),
+ GenericRow.of(4, BinaryString.fromString("Case")));
+
+ assertThat(spark.sql("SELECT * FROM t").collectAsList().toString())
+ .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
+
+ // time travel to snapshot 1
+ assertThat(
+ spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF '%s'", anchor))
+ .collectAsList()
+ .toString())
+ .isEqualTo("[[1,Hello], [2,Paimon]]");
+ }
+
+ @Test
+ public void testTravelToTimestampNumber() throws Exception {
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ writeTable(
+ "t",
+ GenericRow.of(1, BinaryString.fromString("Hello")),
+ GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+ Thread.sleep(1000); // avoid precision problem
+ long anchor = System.currentTimeMillis() / 1000; // convert to seconds
+
+ // snapshot 2
+ writeTable(
+ "t",
+ GenericRow.of(3, BinaryString.fromString("Test")),
+ GenericRow.of(4, BinaryString.fromString("Case")));
+
+ assertThat(spark.sql("SELECT * FROM t").collectAsList().toString())
+ .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
+
+ // time travel to snapshot 1
+ assertThat(
+ spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF %s", anchor))
+ .collectAsList()
+ .toString())
+ .isEqualTo("[[1,Hello], [2,Paimon]]");
+ }
+
+ @Test
+ public void testTravelToOldSchema() throws Exception {
+ // old schema
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ writeTable(
+ "t",
+ GenericRow.of(1, BinaryString.fromString("Hello")),
+ GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+ // new schema
+ spark.sql("ALTER TABLE t ADD COLUMN dt STRING");
+
+ // snapshot 2
+ writeTable(
+ "t",
+ GenericRow.of(3, BinaryString.fromString("Test"), BinaryString.fromString("0401")),
+ GenericRow.of(4, BinaryString.fromString("Case"), BinaryString.fromString("0402")));
+
+ assertThat(spark.sql("SELECT * FROM t").collectAsList().toString())
+ .isEqualTo("[[1,Hello,null], [2,Paimon,null], [3,Test,0401], [4,Case,0402]]");
+
+ // test that cannot see column dt after time travel
+ assertThat(spark.sql("SELECT * FROM t VERSION AS OF 1").collectAsList().toString())
+ .isEqualTo("[[1,Hello], [2,Paimon]]");
+ }
+
+ @Test
+ public void testTravelToNonExistedVersion() {
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ assertThat(spark.sql("SELECT * FROM t VERSION AS OF 2").collectAsList()).isEmpty();
+ }
+
+ @Test
+ public void testTravelToNonExistedTimestamp() {
+ long anchor = System.currentTimeMillis() / 1000;
+
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ assertThat(
+ spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF %s", anchor))
+ .collectAsList())
+ .isEmpty();
+ }
+
+ @Test
+ public void testIllegalVersionString() {
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ assertThatThrownBy(() -> spark.sql("SELECT * FROM t VERSION AS OF '1.5'"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ IllegalArgumentException.class,
+ "Version for time travel should be a LONG value representing snapshot id but was '1.5'."));
+ }
+
+ @Test
+ public void testUnsupportedSystemTableTimeTravel() {
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ assertThatThrownBy(() -> spark.sql("SELECT * FROM `t$snapshots` VERSION AS OF 1"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Only DataTable supports time travel but given table type is 'org.apache.paimon.table.system.SnapshotsTable'"));
+ }
+}