You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/12 23:18:09 UTC

[GitHub] [iceberg] wypoon opened a new pull request #3722: Spark: Use snapshot schema when reading snapshot

wypoon opened a new pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722


   This has been implemented for Spark 2 in #1508. For Spark 3, Ryan Blue proposed a syntax for adding the snapshot id or timestamp to the table identifier in #3269. Here we implement the Spark 3 support for using the snapshot schema by using the proposed table identifier syntax. This is until a new version of Spark 3 is released with support for `AS OF` in Spark SQL.
   Note: The table identifier syntax is for internal use only (as in this implementation) and not meant to be exposed as a publicly supported syntax in SQL. However, for testing, we do test its use from SQL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770134655



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
##########
@@ -120,4 +120,42 @@ public void testMetadataTables() {
         ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
         sql("SELECT * FROM %s.snapshots", tableName));
   }
+
+  @Test

Review comment:
       Added test cases for reading with both snapshot-id and as-of-timestamp, inserting into a table at a specific snapshot, and deleting from a table at a specific snapshot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770142597



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
##########
@@ -120,4 +124,82 @@ public void testMetadataTables() {
         ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
         sql("SELECT * FROM %s.snapshots", tableName));
   }
+
+  @Test
+  public void testSnapshotInTableName() {
+    Assume.assumeFalse(
+        "Spark session catalog does not support extended table names",
+        "spark_catalog".equals(catalogName));
+
+    // get the snapshot ID of the last write and get the current row set as expected
+    long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+    List<Object[]> expected = sql("SELECT * FROM %s", tableName);
+
+    // create a second snapshot
+    sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
+
+    String prefix = "snapshot_id_";
+    // read the table at the snapshot
+    List<Object[]> actual = sql("SELECT * FROM %s.%s", tableName, prefix + snapshotId);
+    assertEquals("Snapshot at specific ID, prefix " + prefix, expected, actual);
+
+    // read the table using DataFrameReader option
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .option(SparkReadOptions.SNAPSHOT_ID, snapshotId)
+        .load(tableName);
+    List<Object[]> fromDF = rowsToJava(df.collectAsList());
+    assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF);
+  }
+
+  @Test
+  public void testTimestampInTableName() {
+    Assume.assumeFalse(
+        "Spark session catalog does not support extended table names",
+        "spark_catalog".equals(catalogName));
+
+    // get a timestamp just after the last write and get the current row set as expected
+    long timestamp = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis() + 2;

Review comment:
       Can you use `waitUntilAfter` defined in `SparkTestBase` to avoid flaky tests?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-995401168


   @rdblue thanks for all the reviews. I adopted your suggestion around `SparkTable#newScanBuilder` and `SparkTable.addSnapshotId`, and made the other tweaks you suggested.
   @jackye1995 thanks for your reviews and suggestions too.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768287090



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,12 +105,26 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+
+    Long snapshotId = getPropertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
+    Long asOfTimestamp = getPropertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);
+    String selector = "";
+    if (snapshotId != null) {
+      selector = SNAPSHOT_ID + snapshotId;
+    }
+    if (asOfTimestamp != null) {

Review comment:
       Add blank lines.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770146014



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
##########
@@ -120,4 +120,42 @@ public void testMetadataTables() {
         ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
         sql("SELECT * FROM %s.snapshots", tableName));
   }
+
+  @Test

Review comment:
       These look good to me. Thanks for adding them!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-996907561


   @rdblue @jackye1995 if this can be merged, I'll prepare PRs for Spark 3.1 and 3.0 for porting it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768287615



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -115,10 +133,28 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
+      Identifier ident = catalogAndIdentifier.identifier();
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
-    } else {
+          newIdentifier(ident, selector));
+    } else if (snapshotId == null && asOfTimestamp == null) {
       return catalogAndIdentifier;
+    } else {
+      CatalogPlugin catalog = catalogAndIdentifier.catalog();
+      Identifier ident = catalogAndIdentifier.identifier();
+      return new Spark3Util.CatalogAndIdentifier(catalog,
+          newIdentifier(ident, selector));
+    }
+  }
+
+  private Identifier newIdentifier(Identifier ident, String newName) {
+    if (newName.equals("")) {
+      return ident;
+    } else {
+      String[] namespace = ident.namespace();
+      String name = ident.name();

Review comment:
       Skipped creating the variable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768287676



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -132,6 +168,14 @@ public String extractCatalog(CaseInsensitiveStringMap options) {
     return catalogAndIdentifier(options).catalog().name();
   }
 
+  private static Long getPropertyAsLong(CaseInsensitiveStringMap options, String property) {

Review comment:
       Renamed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770134769



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,24 +105,54 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+
+    Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
+    Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);
+
+    String selector = null;
+
+    if (snapshotId != null) {
+      selector = SNAPSHOT_ID + snapshotId;
+    }
+
+    if (asOfTimestamp != null) {
+      selector = AT_TIMESTAMP + asOfTimestamp;
+    }
+
     CatalogManager catalogManager = spark.sessionState().catalogManager();
 
     if (path.contains("/")) {
       // contains a path. Return iceberg default catalog and a PathIdentifier
+      String newPath = (selector == null) ? path : path + "#" + selector;
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          new PathIdentifier(path));
+          new PathIdentifier(newPath));
     }
 
     final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
         "path or identifier", spark, path);
 
+    Identifier ident = identifierWithSelector(catalogAndIdentifier.identifier(), selector);
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
+          ident);
     } else {
-      return catalogAndIdentifier;
+      return new Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(),
+          ident);

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r769268392



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -132,6 +166,14 @@ public String extractCatalog(CaseInsensitiveStringMap options) {
     return catalogAndIdentifier(options).catalog().name();
   }
 
+  private static Long propertyAsLong(CaseInsensitiveStringMap options, String property) {
+    String value = options.get(property);
+    if (value != null) {
+      return Long.parseLong(value);
+    }

Review comment:
       Will add a blank line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768287090



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,12 +105,26 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+
+    Long snapshotId = getPropertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
+    Long asOfTimestamp = getPropertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);
+    String selector = "";
+    if (snapshotId != null) {
+      selector = SNAPSHOT_ID + snapshotId;
+    }
+    if (asOfTimestamp != null) {

Review comment:
       Added blank lines.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r769155230



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -132,6 +166,14 @@ public String extractCatalog(CaseInsensitiveStringMap options) {
     return catalogAndIdentifier(options).catalog().name();
   }
 
+  private static Long propertyAsLong(CaseInsensitiveStringMap options, String property) {
+    String value = options.get(property);
+    if (value != null) {
+      return Long.parseLong(value);
+    }

Review comment:
       nit: newline after if

##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
##########
@@ -120,4 +120,42 @@ public void testMetadataTables() {
         ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
         sql("SELECT * FROM %s.snapshots", tableName));
   }
+
+  @Test

Review comment:
       I think we are missing a few failure test cases:
   1. Cannot specify both snapshot-id and as-of-timestamp
   2. Cannot write from table at a specific snapshot
   3. Cannot delete from table at a specific snapshot




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon edited a comment on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon edited a comment on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-996907561


   @rdblue @jackye1995 if this can be merged, I'll prepare PRs for Spark 3.1 and 3.0 for porting it. I'll be on vacation for the next two weeks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-996909687


   Thanks, @wypoon! Nice work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r769269845



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
##########
@@ -120,4 +120,42 @@ public void testMetadataTables() {
         ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
         sql("SELECT * FROM %s.snapshots", tableName));
   }
+
+  @Test

Review comment:
       Ack. I agree that it'd be good to have such test cases. I'd point out though that none of the above should be supported even before this change, so if the test cases don't exist, they are existing holes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-992774597


   Skimmed through this, mostly look good to me as most of the content was reviewed once in the original PR, I will take another deeper look in the afternoon.
   
   And FYI, I am also adding the time travel support in Trino (https://github.com/trinodb/trino/pull/10258), I will add another PR to match this behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770161465



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -186,22 +184,37 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
       icebergTable.refresh();
     }
 
-    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
-
-    if (requestedSchema != null) {
-      scanBuilder.pruneColumns(requestedSchema);
-    }
-
-    return scanBuilder;
+    // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", <snapshot_id>)
+    // or option("as-of-timestamp", <timestamp>)  is applied to the DataFrameReader, SparkTable will be
+    // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called
+    // with the options, which will include "snapshot-id" or "as-of-timestamp".
+    // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot
+    // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but
+    // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option.
+    // We therefore add a "snapshot-id" option here in this latter case.
+    CaseInsensitiveStringMap scanOptions =

Review comment:
       Let me try it out.
   I had run into a problem with the original addSnapshot function and always calling it. After I analysed what was happening, I wrote that comment to remind myself. I therefore called addSnapshot only when strictly necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770161465



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -186,22 +184,37 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
       icebergTable.refresh();
     }
 
-    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
-
-    if (requestedSchema != null) {
-      scanBuilder.pruneColumns(requestedSchema);
-    }
-
-    return scanBuilder;
+    // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", <snapshot_id>)
+    // or option("as-of-timestamp", <timestamp>)  is applied to the DataFrameReader, SparkTable will be
+    // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called
+    // with the options, which will include "snapshot-id" or "as-of-timestamp".
+    // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot
+    // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but
+    // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option.
+    // We therefore add a "snapshot-id" option here in this latter case.
+    CaseInsensitiveStringMap scanOptions =

Review comment:
       Let me try it out.
   I had run into a problem with the original `addSnapshotId` function and always calling it. After I analysed what was happening, I wrote that comment to remind myself. I therefore called addSnapshot only when strictly necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768059097



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -89,15 +93,10 @@ public SparkTable(Table icebergTable, boolean refreshEagerly) {
     this(icebergTable, null, refreshEagerly);
   }
 
-  public SparkTable(Table icebergTable, StructType requestedSchema, boolean refreshEagerly) {
+  public SparkTable(Table icebergTable, Long snapshotId, boolean refreshEagerly) {
     this.icebergTable = icebergTable;
-    this.requestedSchema = requestedSchema;
+    this.snapshotId = snapshotId;
     this.refreshEagerly = refreshEagerly;
-
-    if (requestedSchema != null) {
-      // convert the requested schema to throw an exception if any requested fields are unknown
-      SparkSchemaUtil.convert(icebergTable.schema(), requestedSchema);
-    }

Review comment:
       I pointed this out in #1508 and I'll point it out again here:
   
   I removed requestedSchema from SparkTable because with #1783, the Spark 3 IcebergSource changed to be a SupportsCatalogOptions, not just a TableProvider. Since DataFrameReader does not support specifying a schema when reading from an IcebergSource:
   ```
       DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider =>
         ...
         val (table, catalog, ident) = provider match {
           case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty =>
             throw new IllegalArgumentException(
               s"$source does not support user specified schema. Please don't specify the schema.")
   ```
   (see https://github.com/apache/spark/blob/v3.2.0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L220-L223)
   there is no reason to have a requestedSchema field as we cannot make use of it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768059097



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -89,15 +93,10 @@ public SparkTable(Table icebergTable, boolean refreshEagerly) {
     this(icebergTable, null, refreshEagerly);
   }
 
-  public SparkTable(Table icebergTable, StructType requestedSchema, boolean refreshEagerly) {
+  public SparkTable(Table icebergTable, Long snapshotId, boolean refreshEagerly) {
     this.icebergTable = icebergTable;
-    this.requestedSchema = requestedSchema;
+    this.snapshotId = snapshotId;
     this.refreshEagerly = refreshEagerly;
-
-    if (requestedSchema != null) {
-      // convert the requested schema to throw an exception if any requested fields are unknown
-      SparkSchemaUtil.convert(icebergTable.schema(), requestedSchema);
-    }

Review comment:
       I pointed this out in #1508 and I'll point it out again here:
   
   I removed `requestedSchema` from `SparkTable` because with #1783, the Spark 3 `IcebergSource` changed to be a `SupportsCatalogOptions`, not just a `TableProvider`. Since `DataFrameReader` does not support specifying a schema when reading from an `IcebergSource`:
   ```
       DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider =>
         ...
         val (table, catalog, ident) = provider match {
           case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty =>
             throw new IllegalArgumentException(
               s"$source does not support user specified schema. Please don't specify the schema.")
   ```
   (see https://github.com/apache/spark/blob/v3.2.0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L220-L223)
   there is no reason to have a `requestedSchema` field as we cannot make use of it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768098662



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -286,4 +287,35 @@ public int hashCode() {
     // use only name in order to correctly invalidate Spark cache
     return icebergTable.name().hashCode();
   }
+
+  private CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options) {

Review comment:
       I think this should match the version from #3269: https://github.com/apache/iceberg/pull/3269/files#diff-31cb389fec0f0d8d4bc9ae93b6ed7624aaefb064adecde1da37ebcf8ecf748acR287-R300
   
   There is no need to access all of the state information here. This method's responsibility is to create a new `options` map with the correct snapshot, not to decide what happens in the scan builder.
   
   The snapshot ID to scan should be resolved in the caller, `newScanBuilder`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770141026



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
##########
@@ -1141,6 +1155,223 @@ public void testPartitionsTable() {
     }
   }
 
+  @Test
+  public synchronized void testSnapshotReadAfterAddColumn() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+    List<Row> originalRecords = Lists.newArrayList(
+        RowFactory.create(1, "x"),
+        RowFactory.create(2, "y"),
+        RowFactory.create(3, "z"));
+
+    StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
+    inputDf.select("id", "data").write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+
+    Dataset<Row> resultDf = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", originalRecords,
+        resultDf.orderBy("id").collectAsList());
+
+    Snapshot snapshot1 = table.currentSnapshot();

Review comment:
       A better name would be `beforeAddColumn`. In general, I think adding numbers to a generic name is not a good practice for readable tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770140418



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -286,4 +299,20 @@ public int hashCode() {
     // use only name in order to correctly invalidate Spark cache
     return icebergTable.name().hashCode();
   }
+
+  private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) {
+    if (snapshotId != null) {
+      String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID);
+      Preconditions.checkArgument(snapshotIdFromOptions == null,
+          "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
+
+      Map<String, String> scanOptions = Maps.newHashMap();
+      scanOptions.putAll(options.asCaseSensitiveMap());
+      scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId));

Review comment:
       This should also remove `as-of-timestamp` since `snapshot-id` is being set. I think I missed that in my PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770182221



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -186,22 +184,37 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
       icebergTable.refresh();
     }
 
-    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
-
-    if (requestedSchema != null) {
-      scanBuilder.pruneColumns(requestedSchema);
-    }
-
-    return scanBuilder;
+    // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", <snapshot_id>)
+    // or option("as-of-timestamp", <timestamp>)  is applied to the DataFrameReader, SparkTable will be
+    // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called
+    // with the options, which will include "snapshot-id" or "as-of-timestamp".
+    // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot
+    // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but
+    // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option.
+    // We therefore add a "snapshot-id" option here in this latter case.
+    CaseInsensitiveStringMap scanOptions =

Review comment:
       Ok, I see that you changed
   ```
         Preconditions.checkArgument(snapshotIdFromOptions == null,
             "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
   ```
   to
   ```
         Preconditions.checkArgument(snapshotIdFromOptions == null || snapshotId.toString().equals(snapshotIdFromOptions),
             "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
   ```
   in `addSnapshotId`.
   With the old version, you should only call `addSnapshotId` if the `options` did not already have `snapshot-id` or `as-of-timestamp`.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770159329



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -286,4 +299,20 @@ public int hashCode() {
     // use only name in order to correctly invalidate Spark cache
     return icebergTable.name().hashCode();
   }
+
+  private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) {
+    if (snapshotId != null) {
+      String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID);
+      Preconditions.checkArgument(snapshotIdFromOptions == null,
+          "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
+
+      Map<String, String> scanOptions = Maps.newHashMap();
+      scanOptions.putAll(options.asCaseSensitiveMap());
+      scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId));

Review comment:
       Thanks, that makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770139467



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -186,22 +184,37 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
       icebergTable.refresh();
     }
 
-    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
-
-    if (requestedSchema != null) {
-      scanBuilder.pruneColumns(requestedSchema);
-    }
-
-    return scanBuilder;
+    // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", <snapshot_id>)
+    // or option("as-of-timestamp", <timestamp>)  is applied to the DataFrameReader, SparkTable will be
+    // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called
+    // with the options, which will include "snapshot-id" or "as-of-timestamp".
+    // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot
+    // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but
+    // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option.
+    // We therefore add a "snapshot-id" option here in this latter case.
+    CaseInsensitiveStringMap scanOptions =

Review comment:
       I'm not sure this is worth the complexity. Why not just always add the snapshot ID?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770068550



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,24 +105,54 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+
+    Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
+    Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);
+
+    String selector = null;
+
+    if (snapshotId != null) {
+      selector = SNAPSHOT_ID + snapshotId;
+    }
+
+    if (asOfTimestamp != null) {
+      selector = AT_TIMESTAMP + asOfTimestamp;
+    }
+
     CatalogManager catalogManager = spark.sessionState().catalogManager();
 
     if (path.contains("/")) {
       // contains a path. Return iceberg default catalog and a PathIdentifier
+      String newPath = (selector == null) ? path : path + "#" + selector;
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          new PathIdentifier(path));
+          new PathIdentifier(newPath));
     }
 
     final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
         "path or identifier", spark, path);
 
+    Identifier ident = identifierWithSelector(catalogAndIdentifier.identifier(), selector);
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
+          ident);
     } else {
-      return catalogAndIdentifier;
+      return new Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(),
+          ident);

Review comment:
       Nit: looks like some of these args don't need to be wrapped to the next line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-992748945


   Yes agree, I think we need to include this for 0.13 consistent experience in 3.x and 2.4. Please let me know if anyone is against it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768101609



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
##########
@@ -65,16 +68,22 @@
     this.spark = spark;
     this.table = table;
     this.readConf = new SparkReadConf(spark, table, options);
+    this.snapshotId = readConf.snapshotId();
+    this.asOfTimestamp = readConf.asOfTimestamp();
     this.caseSensitive = readConf.caseSensitive();
   }
 
+  private Schema snapshotSchema() {

Review comment:
       I think the schema should be passed into this builder, not resolved here. The problem with this is that Spark has already analyzed the query using the schema returned by `SparkTable`. Whatever `SparkTable` reported as the schema must be what this class uses as the basis for projection, or else Iceberg could break resolution -- and that's worse than using a different projection schema.
   
   For cases where `snapshot-id` or `as-of-timestamp` are passed through read options, I think we have to use the current table schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768102151



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -186,22 +185,24 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
       icebergTable.refresh();
     }
 
-    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
-
-    if (requestedSchema != null) {
-      scanBuilder.pruneColumns(requestedSchema);
-    }
-
-    return scanBuilder;
+    return new SparkScanBuilder(sparkSession(), icebergTable, addSnapshotId(options));

Review comment:
       I think this method needs to determine the base schema (whatever was returned by `schema()`) and pass that into the scan builder so that the scan is consistent with analysis that's already happened using just the table instance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768287794



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -286,4 +287,35 @@ public int hashCode() {
     // use only name in order to correctly invalidate Spark cache
     return icebergTable.name().hashCode();
   }
+
+  private CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options) {

Review comment:
       Reverted to your version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-992790057


   @aokolnychyi can you take a look too, in case it conflicts with any changes you're making?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768092154



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -115,10 +133,28 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
+      Identifier ident = catalogAndIdentifier.identifier();
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
-    } else {
+          newIdentifier(ident, selector));
+    } else if (snapshotId == null && asOfTimestamp == null) {

Review comment:
       I think this mixes the identifier logic in too many places. The identifier should be independent of the `if`/`else` blocks here that determine catalog. We don't need to check the selector in `newIdentifier` as well as the data that the selector is dependent on here.
   
   Instead, I think this should create the new identifier using `identifierWithSelector(catalogAndIdentifier.identifier(), selector)` and then pass that identifier through this block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768092584



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -115,10 +133,28 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
+      Identifier ident = catalogAndIdentifier.identifier();
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
-    } else {
+          newIdentifier(ident, selector));
+    } else if (snapshotId == null && asOfTimestamp == null) {
       return catalogAndIdentifier;
+    } else {
+      CatalogPlugin catalog = catalogAndIdentifier.catalog();
+      Identifier ident = catalogAndIdentifier.identifier();
+      return new Spark3Util.CatalogAndIdentifier(catalog,
+          newIdentifier(ident, selector));
+    }
+  }
+
+  private Identifier newIdentifier(Identifier ident, String newName) {

Review comment:
       This isn't very descriptive. I would rather call it `identifierWithSelector` and rename `newName` to `selector`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768089151



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,12 +105,26 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+
+    Long snapshotId = getPropertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
+    Long asOfTimestamp = getPropertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);
+    String selector = "";

Review comment:
       Instead of using an empty string, I think it is better to use `null` as a signalling value. Then you can use `!=` null when creating the path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770139467



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -186,22 +184,37 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
       icebergTable.refresh();
     }
 
-    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
-
-    if (requestedSchema != null) {
-      scanBuilder.pruneColumns(requestedSchema);
-    }
-
-    return scanBuilder;
+    // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", <snapshot_id>)
+    // or option("as-of-timestamp", <timestamp>)  is applied to the DataFrameReader, SparkTable will be
+    // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called
+    // with the options, which will include "snapshot-id" or "as-of-timestamp".
+    // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot
+    // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but
+    // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option.
+    // We therefore add a "snapshot-id" option here in this latter case.
+    CaseInsensitiveStringMap scanOptions =

Review comment:
       I'm not sure this is worth the complexity. Why not just always add the snapshot ID if `snapshotId` is set? We know that if it is set, the option `snapshot-id` or `as-of-timestamp` should correspond to it. We should just make sure that the given snapshot ID is set in the options and remove `as-of-timestamp` if it is set. That makes this whole block simpler:
   
   ```
   CaseInsensitiveStringMap scanOptions = snapshotId != null ? addSnapshotId(options, snapshotId) : options;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770192289



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
##########
@@ -120,4 +124,82 @@ public void testMetadataTables() {
         ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
         sql("SELECT * FROM %s.snapshots", tableName));
   }
+
+  @Test
+  public void testSnapshotInTableName() {
+    Assume.assumeFalse(
+        "Spark session catalog does not support extended table names",
+        "spark_catalog".equals(catalogName));
+
+    // get the snapshot ID of the last write and get the current row set as expected
+    long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
+    List<Object[]> expected = sql("SELECT * FROM %s", tableName);
+
+    // create a second snapshot
+    sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
+
+    String prefix = "snapshot_id_";
+    // read the table at the snapshot
+    List<Object[]> actual = sql("SELECT * FROM %s.%s", tableName, prefix + snapshotId);
+    assertEquals("Snapshot at specific ID, prefix " + prefix, expected, actual);
+
+    // read the table using DataFrameReader option
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .option(SparkReadOptions.SNAPSHOT_ID, snapshotId)
+        .load(tableName);
+    List<Object[]> fromDF = rowsToJava(df.collectAsList());
+    assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF);
+  }
+
+  @Test
+  public void testTimestampInTableName() {
+    Assume.assumeFalse(
+        "Spark session catalog does not support extended table names",
+        "spark_catalog".equals(catalogName));
+
+    // get a timestamp just after the last write and get the current row set as expected
+    long timestamp = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis() + 2;

Review comment:
       I added a `waitUntilAfter`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770134237



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -132,6 +166,14 @@ public String extractCatalog(CaseInsensitiveStringMap options) {
     return catalogAndIdentifier(options).catalog().name();
   }
 
+  private static Long propertyAsLong(CaseInsensitiveStringMap options, String property) {
+    String value = options.get(property);
+    if (value != null) {
+      return Long.parseLong(value);
+    }

Review comment:
       Added a blank line.
   What is the rationale for always adding a blank line after an if?
   I fail to see how this makes the code more readable.
   I can understand breaking a large block of code up with blank lines in general, but this is a very short method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768088466



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,12 +105,26 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+
+    Long snapshotId = getPropertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
+    Long asOfTimestamp = getPropertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);
+    String selector = "";
+    if (snapshotId != null) {
+      selector = SNAPSHOT_ID + snapshotId;
+    }
+    if (asOfTimestamp != null) {

Review comment:
       Nit: please add a newline between control flow blocks and the following statement




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770147784



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -186,22 +184,37 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
       icebergTable.refresh();
     }
 
-    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
-
-    if (requestedSchema != null) {
-      scanBuilder.pruneColumns(requestedSchema);
-    }
-
-    return scanBuilder;
+    // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", <snapshot_id>)
+    // or option("as-of-timestamp", <timestamp>)  is applied to the DataFrameReader, SparkTable will be
+    // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called
+    // with the options, which will include "snapshot-id" or "as-of-timestamp".
+    // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot
+    // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but
+    // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option.
+    // We therefore add a "snapshot-id" option here in this latter case.
+    CaseInsensitiveStringMap scanOptions =

Review comment:
       With the update to `addSnapshotId` below, this worked fine with tests:
   
   ```java
   CaseInsensitiveStringMap scanOptions = addSnapshotId(options, snapshotId);
   ```
   
   It didn't need the null check because that's done inside `addSnapshotId`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770147913



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -286,4 +299,20 @@ public int hashCode() {
     // use only name in order to correctly invalidate Spark cache
     return icebergTable.name().hashCode();
   }
+
+  private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) {
+    if (snapshotId != null) {
+      String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID);
+      Preconditions.checkArgument(snapshotIdFromOptions == null,
+          "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
+
+      Map<String, String> scanOptions = Maps.newHashMap();
+      scanOptions.putAll(options.asCaseSensitiveMap());
+      scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId));

Review comment:
       I updated this to the following and tests work fine:
   
   ```java
     private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) {
       if (snapshotId != null) {
         String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID);
         Preconditions.checkArgument(snapshotIdFromOptions == null || snapshotId.toString().equals(snapshotIdFromOptions),
             "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
   
         Map<String, String> scanOptions = Maps.newHashMap();
         scanOptions.putAll(options.asCaseSensitiveMap());
         scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId));
         scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP);
   
         return new CaseInsensitiveStringMap(scanOptions);
       }
   
       return options;
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770161465



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -186,22 +184,37 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
       icebergTable.refresh();
     }
 
-    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
-
-    if (requestedSchema != null) {
-      scanBuilder.pruneColumns(requestedSchema);
-    }
-
-    return scanBuilder;
+    // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", <snapshot_id>)
+    // or option("as-of-timestamp", <timestamp>)  is applied to the DataFrameReader, SparkTable will be
+    // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called
+    // with the options, which will include "snapshot-id" or "as-of-timestamp".
+    // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot
+    // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but
+    // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option.
+    // We therefore add a "snapshot-id" option here in this latter case.
+    CaseInsensitiveStringMap scanOptions =

Review comment:
       Let me try it out.
   I had run into a problem with the original `addSnapshotId` function and always calling it. After I analysed what was happening, I wrote that comment to remind myself. I therefore called `addSnapshotId` only when strictly necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770139375



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -132,6 +166,14 @@ public String extractCatalog(CaseInsensitiveStringMap options) {
     return catalogAndIdentifier(options).catalog().name();
   }
 
+  private static Long propertyAsLong(CaseInsensitiveStringMap options, String property) {
+    String value = options.get(property);
+    if (value != null) {
+      return Long.parseLong(value);
+    }

Review comment:
       Yes agree. I think it's mostly just general codestyle rules the community follows, maybe we should just put these into checkstyle instead of being human linters




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770134655



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
##########
@@ -120,4 +120,42 @@ public void testMetadataTables() {
         ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
         sql("SELECT * FROM %s.snapshots", tableName));
   }
+
+  @Test

Review comment:
       Added test cases for reading with both snapshot-id and as-of-timestamp, writing to a table at a specific snapshot, and deleting from a table at a specific snapshot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-995342861


   Thanks, @wypoon! This looks great I think we can get it in with a couple minor changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768197829



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -115,10 +133,28 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
+      Identifier ident = catalogAndIdentifier.identifier();
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
-    } else {
+          newIdentifier(ident, selector));
+    } else if (snapshotId == null && asOfTimestamp == null) {
       return catalogAndIdentifier;
+    } else {
+      CatalogPlugin catalog = catalogAndIdentifier.catalog();
+      Identifier ident = catalogAndIdentifier.identifier();
+      return new Spark3Util.CatalogAndIdentifier(catalog,
+          newIdentifier(ident, selector));
+    }
+  }
+
+  private Identifier newIdentifier(Identifier ident, String newName) {
+    if (newName.equals("")) {
+      return ident;
+    } else {
+      String[] namespace = ident.namespace();
+      String name = ident.name();

Review comment:
       It is used below though. Do you mean just use `ident.name()` below where I used `name`? I can do that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768288721



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
##########
@@ -65,16 +68,22 @@
     this.spark = spark;
     this.table = table;
     this.readConf = new SparkReadConf(spark, table, options);
+    this.snapshotId = readConf.snapshotId();
+    this.asOfTimestamp = readConf.asOfTimestamp();
     this.caseSensitive = readConf.caseSensitive();
   }
 
+  private Schema snapshotSchema() {

Review comment:
       I now pass the `Schema` into the `SparkScanBuilder` when constructing it (and added a `SparkScanBuilder `constructor that takes a `Schema`) in `SparkTable#newScanBuilder`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-992026429


   @rdblue @jackye please take a look. This incorporates #3269 (updated). It would be nice if this could make it into 0.13, as using the snapshot schema is already implemented in the Spark 2.4 support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon edited a comment on pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon edited a comment on pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#issuecomment-992026429


   @rdblue @jackye1995  please take a look. This incorporates #3269 (updated). It would be nice if this could make it into 0.13, as using the snapshot schema is already implemented in the Spark 2.4 support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768092834



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -115,10 +133,28 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
+      Identifier ident = catalogAndIdentifier.identifier();
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
-    } else {
+          newIdentifier(ident, selector));
+    } else if (snapshotId == null && asOfTimestamp == null) {
       return catalogAndIdentifier;
+    } else {
+      CatalogPlugin catalog = catalogAndIdentifier.catalog();
+      Identifier ident = catalogAndIdentifier.identifier();
+      return new Spark3Util.CatalogAndIdentifier(catalog,
+          newIdentifier(ident, selector));
+    }
+  }
+
+  private Identifier newIdentifier(Identifier ident, String newName) {
+    if (newName.equals("")) {
+      return ident;
+    } else {
+      String[] namespace = ident.namespace();
+      String name = ident.name();

Review comment:
       This variable doesn't seem to be needed to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768093383



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -132,6 +168,14 @@ public String extractCatalog(CaseInsensitiveStringMap options) {
     return catalogAndIdentifier(options).catalog().name();
   }
 
+  private static Long getPropertyAsLong(CaseInsensitiveStringMap options, String property) {

Review comment:
       How about `propertyAsLong` instead? There isn't much value in `get` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768287214



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,12 +105,26 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+
+    Long snapshotId = getPropertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
+    Long asOfTimestamp = getPropertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);
+    String selector = "";

Review comment:
       Adopted your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768287372



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -115,10 +133,28 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
+      Identifier ident = catalogAndIdentifier.identifier();
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
-    } else {
+          newIdentifier(ident, selector));
+    } else if (snapshotId == null && asOfTimestamp == null) {

Review comment:
       Simplified according to your suggestion.

##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -115,10 +133,28 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
+      Identifier ident = catalogAndIdentifier.identifier();
       return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
-    } else {
+          newIdentifier(ident, selector));
+    } else if (snapshotId == null && asOfTimestamp == null) {
       return catalogAndIdentifier;
+    } else {
+      CatalogPlugin catalog = catalogAndIdentifier.catalog();
+      Identifier ident = catalogAndIdentifier.identifier();
+      return new Spark3Util.CatalogAndIdentifier(catalog,
+          newIdentifier(ident, selector));
+    }
+  }
+
+  private Identifier newIdentifier(Identifier ident, String newName) {

Review comment:
       Renamed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wypoon commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r768289789



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -186,22 +185,24 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
       icebergTable.refresh();
     }
 
-    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
-
-    if (requestedSchema != null) {
-      scanBuilder.pruneColumns(requestedSchema);
-    }
-
-    return scanBuilder;
+    return new SparkScanBuilder(sparkSession(), icebergTable, addSnapshotId(options));

Review comment:
       `SparkTable#schema` already returns `snapshotSchema()` converted to Spark's `StructType`. Here, as you suggest, I construct the `SparkScanBuilder` with the `snapshotSchema()`. So we are consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3722: Spark: Use snapshot schema when reading snapshot

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770147913



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -286,4 +299,20 @@ public int hashCode() {
     // use only name in order to correctly invalidate Spark cache
     return icebergTable.name().hashCode();
   }
+
+  private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) {
+    if (snapshotId != null) {
+      String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID);
+      Preconditions.checkArgument(snapshotIdFromOptions == null,
+          "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
+
+      Map<String, String> scanOptions = Maps.newHashMap();
+      scanOptions.putAll(options.asCaseSensitiveMap());
+      scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId));

Review comment:
       I updated this to the following and tests work fine:
   
   ```java
     private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) {
       if (snapshotId != null) {
         String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID);
         Preconditions.checkArgument(snapshotIdFromOptions == null || snapshotIdFromOptions.equals(snapshotId.toString()),
             "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions);
   
         Map<String, String> scanOptions = Maps.newHashMap();
         scanOptions.putAll(options.asCaseSensitiveMap());
         scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId));
         scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP);
   
         return new CaseInsensitiveStringMap(scanOptions);
       }
   
       return options;
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org