You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "yuzelin (via GitHub)" <gi...@apache.org> on 2023/03/20 06:46:53 UTC

[GitHub] [incubator-paimon] yuzelin opened a new pull request, #659: [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF)

yuzelin opened a new pull request, #659:
URL: https://github.com/apache/incubator-paimon/pull/659

   ### Purpose
   
   *(What is the purpose of the change, or the associated issue)*
   To support time travel syntax of Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF).
   
   ### Tests
   
   *(List UT and IT cases to verify this change)*
   `SparkTimeTravelITCase`
   
   ### API and Format 
   
   No changes.
   
   ### Documentation
   how-to#Time Travel
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #659: [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF)

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #659:
URL: https://github.com/apache/incubator-paimon/pull/659#discussion_r1141686360


##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.TableChange.AddColumn;
+import org.apache.spark.sql.connector.catalog.TableChange.DeleteColumn;
+import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.RenameColumn;
+import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnPosition;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
+import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
+
+/** Base implementation of Spark {@link TableCatalog} for paimon. */
+public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespaces {

Review Comment:
   We can have a dedicated PR to extract `SparkCatalogBase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #659: [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF)

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #659:
URL: https://github.com/apache/incubator-paimon/pull/659#discussion_r1142102080


##########
docs/content/how-to/querying-tables.md:
##########
@@ -91,6 +91,44 @@ Users can also adjust `changelog-producer` table property to specify the pattern
 Streaming Source can also be bounded, you can specify 'scan.bounded.watermark' to define the end condition for bounded streaming mode, stream reading will end until a larger watermark snapshot is encountered.
 {{< /hint >}}
 
+## Time Travel
+
+Currently, Table Store supports time travel for Flink and Spark 3 (requires Spark 3.3+).
+
+{{< tabs "time-travel-example" >}}
+
+{{< tab "Flink" >}}
+
+you can use [dynamic table options](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#dynamic-table-options) to specify scan mode and from where to start:

Review Comment:
   use `flink-docs-master`?



##########
docs/content/how-to/querying-tables.md:
##########
@@ -91,6 +91,44 @@ Users can also adjust `changelog-producer` table property to specify the pattern
 Streaming Source can also be bounded, you can specify 'scan.bounded.watermark' to define the end condition for bounded streaming mode, stream reading will end until a larger watermark snapshot is encountered.
 {{< /hint >}}
 
+## Time Travel
+
+Currently, Table Store supports time travel for Flink and Spark 3 (requires Spark 3.3+).
+
+{{< tabs "time-travel-example" >}}
+
+{{< tab "Flink" >}}
+
+you can use [dynamic table options](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#dynamic-table-options) to specify scan mode and from where to start:
+
+```sql
+-- travel to snapshot with id 1L
+SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot', 'scan.snapshot-id' = '1') */;

Review Comment:
   remove scan.mode?



##########
docs/content/how-to/querying-tables.md:
##########
@@ -91,6 +91,44 @@ Users can also adjust `changelog-producer` table property to specify the pattern
 Streaming Source can also be bounded, you can specify 'scan.bounded.watermark' to define the end condition for bounded streaming mode, stream reading will end until a larger watermark snapshot is encountered.
 {{< /hint >}}
 
+## Time Travel
+
+Currently, Table Store supports time travel for Flink and Spark 3 (requires Spark 3.3+).
+
+{{< tabs "time-travel-example" >}}
+
+{{< tab "Flink" >}}
+
+you can use [dynamic table options](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#dynamic-table-options) to specify scan mode and from where to start:
+
+```sql
+-- travel to snapshot with id 1L
+SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot', 'scan.snapshot-id' = '1') */;
+
+-- travel to specified timestamp with a long value in milliseconds
+SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-timestamp', 'scan.timestamp-millis' = '1678883047356') */;

Review Comment:
   remove scan.mode?



##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -179,4 +187,32 @@ public TableCommitImpl newCommit(String commitUser) {
                 options().writeOnly() ? null : store().newExpire(),
                 options().writeOnly() ? null : store().newPartitionExpire(commitUser));
     }
+
+    private Optional<TableSchema> tryTimeTravel(Options options) {
+        CoreOptions coreOptions = new CoreOptions(options);
+
+        if (coreOptions.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {

Review Comment:
   can you use a switch case?



##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -179,4 +187,32 @@ public TableCommitImpl newCommit(String commitUser) {
                 options().writeOnly() ? null : store().newExpire(),
                 options().writeOnly() ? null : store().newPartitionExpire(commitUser));
     }
+
+    private Optional<TableSchema> tryTimeTravel(Options options) {
+        CoreOptions coreOptions = new CoreOptions(options);
+
+        if (coreOptions.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
+            long snapshotId = coreOptions.scanSnapshotId();
+            if (snapshotManager().snapshotExists(snapshotId)) {
+                long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
+                LOG.info("Time traveling to snapshot {}, schemaId is {}.", snapshotId, schemaId);
+                return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
+            }
+        }
+
+        if (coreOptions.startupMode() == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+            long timestamp = coreOptions.scanTimestampMills();
+            Long snapshotId = snapshotManager().earlierOrEqualTimeMills(timestamp);

Review Comment:
   maybe we can just new a `StaticFromTimestampStartingScanner` to get snapshot.
   Provide a `StaticFromTimestampStartingScanner.getSnapshot(SnapshotManager, long timestamp)`?



##########
paimon-spark/paimon-spark-3.3/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.spark.SparkCatalogBase;
+import org.apache.paimon.spark.SparkTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Spark {@link TableCatalog} for paimon. */
+public class SparkCatalog extends SparkCatalogBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class);
+
+    @Override
+    public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException {
+        Table table = loadAndCheck(ident);
+        long snapshotId;
+
+        try {
+            snapshotId = Long.parseUnsignedLong(version);
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Version for time travel should be a LONG value representing snapshot id but was '%s'.",
+                            version),
+                    e);
+        }
+
+        LOG.info("Time travel target snapshot id is {}.", snapshotId);
+
+        Options dynamicOptions =
+                new Options()
+                        .set(CoreOptions.SCAN_MODE, CoreOptions.StartupMode.FROM_SNAPSHOT)

Review Comment:
   Don't set scan mode.



##########
docs/content/how-to/querying-tables.md:
##########
@@ -91,6 +91,44 @@ Users can also adjust `changelog-producer` table property to specify the pattern
 Streaming Source can also be bounded, you can specify 'scan.bounded.watermark' to define the end condition for bounded streaming mode, stream reading will end until a larger watermark snapshot is encountered.
 {{< /hint >}}
 
+## Time Travel
+
+Currently, Table Store supports time travel for Flink and Spark 3 (requires Spark 3.3+).

Review Comment:
   Paimon



##########
paimon-spark/paimon-spark-3.3/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.spark.SparkCatalogBase;
+import org.apache.paimon.spark.SparkTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Spark {@link TableCatalog} for paimon. */
+public class SparkCatalog extends SparkCatalogBase {

Review Comment:
   Maybe we can introduce this in spark-common, and make spark-common dependents on spark 3.3?



##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -119,12 +124,15 @@ public FileStoreTable copy(Map<String, String> dynamicOptions) {
         // set dynamic options with default values
         CoreOptions.setDefaultValues(newOptions);

Review Comment:
   Maybe we should add snapshotid to `CoreOptions.setDefaultValues`.



##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -179,4 +187,32 @@ public TableCommitImpl newCommit(String commitUser) {
                 options().writeOnly() ? null : store().newExpire(),
                 options().writeOnly() ? null : store().newPartitionExpire(commitUser));
     }
+
+    private Optional<TableSchema> tryTimeTravel(Options options) {
+        CoreOptions coreOptions = new CoreOptions(options);
+
+        if (coreOptions.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
+            long snapshotId = coreOptions.scanSnapshotId();
+            if (snapshotManager().snapshotExists(snapshotId)) {
+                long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
+                LOG.info("Time traveling to snapshot {}, schemaId is {}.", snapshotId, schemaId);

Review Comment:
   maybe we don't need this log



##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -179,4 +187,32 @@ public TableCommitImpl newCommit(String commitUser) {
                 options().writeOnly() ? null : store().newExpire(),
                 options().writeOnly() ? null : store().newPartitionExpire(commitUser));
     }
+
+    private Optional<TableSchema> tryTimeTravel(Options options) {
+        CoreOptions coreOptions = new CoreOptions(options);
+
+        if (coreOptions.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
+            long snapshotId = coreOptions.scanSnapshotId();
+            if (snapshotManager().snapshotExists(snapshotId)) {
+                long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
+                LOG.info("Time traveling to snapshot {}, schemaId is {}.", snapshotId, schemaId);
+                return Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
+            }
+        }
+
+        if (coreOptions.startupMode() == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+            long timestamp = coreOptions.scanTimestampMills();
+            Long snapshotId = snapshotManager().earlierOrEqualTimeMills(timestamp);
+            if (snapshotId != null) {
+                long schemaId = snapshotManager().snapshot(snapshotId).schemaId();
+                LOG.info(

Review Comment:
   ditto



##########
paimon-spark/paimon-spark-3.3/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.spark.SparkCatalogBase;
+import org.apache.paimon.spark.SparkTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Spark {@link TableCatalog} for paimon. */
+public class SparkCatalog extends SparkCatalogBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class);
+
+    @Override
+    public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException {
+        Table table = loadAndCheck(ident);
+        long snapshotId;
+
+        try {
+            snapshotId = Long.parseUnsignedLong(version);
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Version for time travel should be a LONG value representing snapshot id but was '%s'.",
+                            version),
+                    e);
+        }
+
+        LOG.info("Time travel target snapshot id is {}.", snapshotId);
+
+        Options dynamicOptions =
+                new Options()
+                        .set(CoreOptions.SCAN_MODE, CoreOptions.StartupMode.FROM_SNAPSHOT)
+                        .set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId);
+        return new SparkTable(
+                table.copy(dynamicOptions.toMap()),
+                Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
+    }
+
+    /**
+     * NOTE: Time unit of timestamp here is microsecond (see {@link
+     * TableCatalog#loadTable(Identifier, long)}). But in SQL you should use seconds.
+     */
+    @Override
+    public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
+        FileStoreTable table = loadAndCheck(ident);
+        // Paimon's timestamp use millisecond
+        timestamp = timestamp / 1000;
+
+        LOG.info("Time travel target timestamp is {} milliseconds.", timestamp);
+
+        Options option =
+                new Options()
+                        .set(CoreOptions.SCAN_MODE, CoreOptions.StartupMode.FROM_TIMESTAMP)

Review Comment:
   Don't set scan mode.



##########
paimon-spark/paimon-spark-3.3/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.spark.SparkCatalogBase;
+import org.apache.paimon.spark.SparkTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Spark {@link TableCatalog} for paimon. */
+public class SparkCatalog extends SparkCatalogBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class);
+
+    @Override
+    public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException {
+        Table table = loadAndCheck(ident);
+        long snapshotId;
+
+        try {
+            snapshotId = Long.parseUnsignedLong(version);
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Version for time travel should be a LONG value representing snapshot id but was '%s'.",
+                            version),
+                    e);
+        }
+
+        LOG.info("Time travel target snapshot id is {}.", snapshotId);
+
+        Options dynamicOptions =
+                new Options()
+                        .set(CoreOptions.SCAN_MODE, CoreOptions.StartupMode.FROM_SNAPSHOT)
+                        .set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId);
+        return new SparkTable(
+                table.copy(dynamicOptions.toMap()),
+                Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
+    }
+
+    /**
+     * NOTE: Time unit of timestamp here is microsecond (see {@link
+     * TableCatalog#loadTable(Identifier, long)}). But in SQL you should use seconds.
+     */
+    @Override
+    public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
+        FileStoreTable table = loadAndCheck(ident);
+        // Paimon's timestamp use millisecond
+        timestamp = timestamp / 1000;
+
+        LOG.info("Time travel target timestamp is {} milliseconds.", timestamp);
+
+        Options option =
+                new Options()
+                        .set(CoreOptions.SCAN_MODE, CoreOptions.StartupMode.FROM_TIMESTAMP)
+                        .set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp);
+        return new SparkTable(
+                table.copy(option.toMap()),
+                Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
+    }
+
+    private FileStoreTable loadAndCheck(Identifier ident) throws NoSuchTableException {
+        try {
+            Table table = load(ident);
+            if (!(table instanceof FileStoreTable)) {

Review Comment:
   For all `DataTable`s?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #659: [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF)

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #659:
URL: https://github.com/apache/incubator-paimon/pull/659#discussion_r1144219456


##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -53,7 +55,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
 
     protected final FileIO fileIO;
     protected final Path path;
-    protected final TableSchema tableSchema;
+    protected TableSchema tableSchema;

Review Comment:
   `tableSchema` should be a immutable object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi merged pull request #659: [spark] Support time travel for Spark 3.3 (VERSION AS OF and TIMESTAMP AS OF)

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #659:
URL: https://github.com/apache/incubator-paimon/pull/659


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org