You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/03 02:28:20 UTC

[GitHub] [iceberg] flyrain opened a new pull request, #6350: Query changelog table with a timestamp range

flyrain opened a new pull request, #6350:
URL: https://github.com/apache/iceberg/pull/6350

   Per discussion in #6012, I create this PR to support time range query for changelog table. 
   cc @aokolnychyi @RussellSpitzer @szehon-ho @rdblue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1046534226


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -149,8 +149,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
   }
 
   /**
-   * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * Finds the oldest snapshot that was committed either at or after a given time.

Review Comment:
   The first part of the comment is confusing, it should be at least something like this. 
   ```
   Traverses the table's snapshot history
   ```
   Or like other method, just be 
   ```
   Traverses the history
   ```
   Moreover, I think it is not necessary to mention that. WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044836296


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {

Review Comment:
   I would probably split out the invalid cases into another test? Not a big deal though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044874538


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected changed rows only from snapshot 3",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap3.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap3.snapshotId())),
+        changelogRecords(snap2.timestampMillis() + 1, snap3.timestampMillis()));

Review Comment:
   Good point, I will use the `waitUntilAfter()` for that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1046536345


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -213,13 +214,26 @@ public Scan build() {
         SparkReadOptions.END_SNAPSHOT_ID,
         SparkReadOptions.START_SNAPSHOT_ID);
 
+    checkInvalidOptions();
+
     if (startSnapshotId != null) {
       return buildIncrementalAppendScan(startSnapshotId, endSnapshotId);
     } else {
       return buildBatchScan(snapshotId, asOfTimestamp, branch, tag);
     }
   }
 
+  private void checkInvalidOptions() {
+    Long startTimestamp = readConf.startTimestamp();

Review Comment:
   I thought it'd be more readable. No? I'm OK with either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1045099019


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);

Review Comment:
   > Is it necessary to do that since oldestAncestorAfter(startTimestamp) will return the snapshot whose timestamp equals to startTimestamp.
   
   I have re-checked and you are right. Then,  I raised https://github.com/apache/iceberg/pull/6401 to try to unify them.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1048731625


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
+    Preconditions.checkArgument(

Review Comment:
   I think you are correct @flyrain , I only really would want the error for 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain merged pull request #6350: Spark 3.3: Time range query of changelog tables

Posted by GitBox <gi...@apache.org>.
flyrain merged PR #6350:
URL: https://github.com/apache/iceberg/pull/6350


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1045128513


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);

Review Comment:
   Yeah, I was confused by the name at the beginning as well.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044897305


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -285,6 +286,36 @@ public Scan buildChangelogScan() {
 
     Long startSnapshotId = readConf.startSnapshotId();
     Long endSnapshotId = readConf.endSnapshotId();
+    Long startTimestamp = readConf.startTimestamp();
+    Long endTimestamp = readConf.endTimestamp();
+
+    Preconditions.checkArgument(
+        !(startSnapshotId != null && startTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",
+        SparkReadOptions.START_SNAPSHOT_ID,
+        SparkReadOptions.START_TIMESTAMP);
+
+    Preconditions.checkArgument(
+        !(endSnapshotId != null && endTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",
+        SparkReadOptions.END_SNAPSHOT_ID,
+        SparkReadOptions.END_TIMESTAMP);
+
+    if (startTimestamp != null && endTimestamp != null) {
+      Preconditions.checkArgument(
+          startTimestamp < endTimestamp,
+          "Cannot set %s to be greater than %s for changelogs",
+          SparkReadOptions.START_TIMESTAMP,
+          SparkReadOptions.END_TIMESTAMP);
+    }
+
+    if (startTimestamp != null) {
+      startSnapshotId = getStartSnapshotId(startTimestamp);
+    }
+
+    if (endTimestamp != null) {
+      endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
+    }

Review Comment:
   Good point! Added. Although, we could support time range in incremental as well in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044896124


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -150,7 +150,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
 
   /**
    * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * after the given time(inclusive).

Review Comment:
   Made the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044841503


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected changed rows only from snapshot 3",
+        ImmutableList.of(

Review Comment:
   Another way to make this test a bit clearer I would probably make
   ```
   Snapshot1Rows = ImmutableList.of(row(....
   Snapshot2Rows = ...
   ```
   then add on a helper
   ```
   rowsWithValue(Snapshot1Rows, snap1.snapshotID)
   ```
   
   
   Then this test kind of goes like
   ```
   insertRows(Snapshot1Rows)
   insertRows(Snapshot2Rows)
   insertRows(Snapshot3Rows)
   
   check(expected = rowsWithValue(Snapshot1Rows, snap1.snapshotID)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044323786


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);

Review Comment:
   Spark processes `startTimestamp` logically differently from Flink. I wonder if we should unify them?
   
   In Flink, it will first find whether there is a snapshot, its time is equal `startTimestamp`. If not, `SnapshotUtil.oldestAncestorAfter` will be used.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -32,6 +32,12 @@ private SparkReadOptions() {}
   // End snapshot ID used in incremental scans (inclusive)
   public static final String END_SNAPSHOT_ID = "end-snapshot-id";
 
+  // Start timestamp used in multi-snapshot scans (exclusive)
+  public static final String START_TIMESTAMP = "start-timestamp";

Review Comment:
   Yes, it's called "start-snapshot-timestamp" in flink.
   
   https://github.com/apache/iceberg/blob/6d47097151b4df1d3269563a7ebfdb4b6c270a64/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java#L48-L49
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1046541628


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -149,8 +149,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
   }
 
   /**
-   * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * Finds the oldest snapshot that was committed either at or after a given time.

Review Comment:
   Rephrased a bit. Please take a look. Any suggestion is welcome.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Spark 3.3: Time range query of changelog tables

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1054640659


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -149,8 +149,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
   }
 
   /**
-   * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * Finds the oldest snapshot of a table that was committed either at or after a given time.

Review Comment:
   Sure, will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Spark 3.3: Time range query of changelog tables

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1054901416


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -149,8 +149,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
   }
 
   /**
-   * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * Finds the oldest snapshot of a table that was committed either at or after a given time.

Review Comment:
   Filed https://github.com/apache/iceberg/pull/6479



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1038729684


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
+    Preconditions.checkArgument(

Review Comment:
   We don't have to throw exception here, another option is to return an empty set of row.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1048750874


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected changed rows only from snapshot 3",
+        ImmutableList.of(

Review Comment:
   Not a big deal if we aren't using this a lot. If we end up making more tests like this in the future though I think it would be a good utility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1046541174


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -213,13 +214,26 @@ public Scan build() {
         SparkReadOptions.END_SNAPSHOT_ID,
         SparkReadOptions.START_SNAPSHOT_ID);
 
+    checkInvalidOptions();
+
     if (startSnapshotId != null) {
       return buildIncrementalAppendScan(startSnapshotId, endSnapshotId);
     } else {
       return buildBatchScan(snapshotId, asOfTimestamp, branch, tag);
     }
   }
 
+  private void checkInvalidOptions() {
+    Long startTimestamp = readConf.startTimestamp();

Review Comment:
   Inlined in the new commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1043984156


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -150,7 +150,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
 
   /**
    * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * after the given time(inclusive).

Review Comment:
   Nit: space before (inclusive)



##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected changed rows only from snapshot 3",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap3.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap3.snapshotId())),
+        changelogRecords(snap2.timestampMillis() + 1, snap3.timestampMillis()));

Review Comment:
   Should we use the mechanism of  TestExpireSnapshot::rightAfterSnapshot ?  I think its a bit more readable. 
    Also, guarantees the snapshot timestamps are not the same (though its unlikely), which may lead to weird failures?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -285,6 +286,36 @@ public Scan buildChangelogScan() {
 
     Long startSnapshotId = readConf.startSnapshotId();
     Long endSnapshotId = readConf.endSnapshotId();
+    Long startTimestamp = readConf.startTimestamp();
+    Long endTimestamp = readConf.endTimestamp();
+
+    Preconditions.checkArgument(
+        !(startSnapshotId != null && startTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",

Review Comment:
   Too many negatives, I'm a bit confused.  Does this capture it?  "Cannot set both %s and %s"



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
+    Preconditions.checkArgument(

Review Comment:
   To me, may be slightly more user friendly to return empty set.  But open.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -32,6 +32,12 @@ private SparkReadOptions() {}
   // End snapshot ID used in incremental scans (inclusive)
   public static final String END_SNAPSHOT_ID = "end-snapshot-id";
 
+  // Start timestamp used in multi-snapshot scans (exclusive)
+  public static final String START_TIMESTAMP = "start-timestamp";

Review Comment:
   Looks like Flink has a config called 'start-snapshot-timestamp'.  Its a bit wordy but wondering if we need to synchronize the two (if they are related):  cc @stevenzwu @pvary @hililiwei 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -285,6 +286,36 @@ public Scan buildChangelogScan() {
 
     Long startSnapshotId = readConf.startSnapshotId();
     Long endSnapshotId = readConf.endSnapshotId();
+    Long startTimestamp = readConf.startTimestamp();
+    Long endTimestamp = readConf.endTimestamp();
+
+    Preconditions.checkArgument(
+        !(startSnapshotId != null && startTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",
+        SparkReadOptions.START_SNAPSHOT_ID,
+        SparkReadOptions.START_TIMESTAMP);
+
+    Preconditions.checkArgument(
+        !(endSnapshotId != null && endTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",
+        SparkReadOptions.END_SNAPSHOT_ID,
+        SparkReadOptions.END_TIMESTAMP);
+
+    if (startTimestamp != null && endTimestamp != null) {
+      Preconditions.checkArgument(
+          startTimestamp < endTimestamp,
+          "Cannot set %s to be greater than %s for changelogs",
+          SparkReadOptions.START_TIMESTAMP,
+          SparkReadOptions.END_TIMESTAMP);
+    }
+
+    if (startTimestamp != null) {
+      startSnapshotId = getStartSnapshotId(startTimestamp);
+    }
+
+    if (endTimestamp != null) {
+      endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
+    }

Review Comment:
   Do we need to add corresponding checks in build() for not setting these new properties (like we check today for start-snapshot-id)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044826932


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);

Review Comment:
   This section of code is Identical to the test above and the one below,
   
   Probably need some helpers to reduce repeated code in the suite (I also think this helps focus the test code on exactly what is being tested)
   createTable()
   insertDefaultRows()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044825312


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -150,7 +150,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
 
   /**
    * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * after the given time(inclusive).

Review Comment:
   I'm not sure what Inclusive means in this context? Maybe we need to just move his around to
   "finds the oldest snapshot committed at or after the given snapshot"? I dunno
   "finds the oldest snapshot that was committed either at or after a given time"?
   
   Or ignore me, I just always find the time docs difficult to understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044901322


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);

Review Comment:
   Is it inclusive for the `startTimestamp` in Flink?
   >Spark processes startTimestamp logically differently from Flink. I wonder if we should unify them?
   
   Is it necessary to do that since `oldestAncestorAfter(startTimestamp)` will return the snapshot whose timestamp equals to `startTimestamp`. 
   >In Flink, it will first find whether there is a snapshot, its time is equal startTimestamp. If not, SnapshotUtil.oldestAncestorAfter will be used.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#issuecomment-1344870088

   Hi @szehon-ho, @RussellSpitzer, @hililiwei thanks for the review! Resolved your comments. Take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1046534226


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -149,8 +149,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
   }
 
   /**
-   * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * Finds the oldest snapshot that was committed either at or after a given time.

Review Comment:
   The first part of the comment is confusing, it should be at least something like this. 
   ```
   Traverses the table's snapshot history
   ```
   Moreover, I think it is not necessary to mention that. WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1046518537


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -149,8 +149,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
   }
 
   /**
-   * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * Finds the oldest snapshot that was committed either at or after a given time.

Review Comment:
   Nit: is it necessary to change whole sentence?  How about just replacing 'after' to 'at or after'?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -213,13 +214,26 @@ public Scan build() {
         SparkReadOptions.END_SNAPSHOT_ID,
         SparkReadOptions.START_SNAPSHOT_ID);
 
+    checkInvalidOptions();
+
     if (startSnapshotId != null) {
       return buildIncrementalAppendScan(startSnapshotId, endSnapshotId);
     } else {
       return buildBatchScan(snapshotId, asOfTimestamp, branch, tag);
     }
   }
 
+  private void checkInvalidOptions() {
+    Long startTimestamp = readConf.startTimestamp();

Review Comment:
   Nit: maybe just inline this if its only used in one place (unless I miss something)?  Doesnt make sense to make a helper method called 'checkInvalidOption' but only add one specific check.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
+    Preconditions.checkArgument(

Review Comment:
   Having an exception to distinguish is a fair point.  Initially I was thinking it would be a pain for user to have to deal with exception if they wanted to do all changes from a random time, but they can find the earliest snapshot and make sure the timestamp is appropriate to avoid exception.  So still no real preference, ok either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6350: Spark 3.3: Time range query of changelog tables

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1054610098


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -149,8 +149,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
   }
 
   /**
-   * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * Finds the oldest snapshot of a table that was committed either at or after a given time.

Review Comment:
   @flyrain, this change is incorrect. This is not the oldest known snapshot, it is the oldest known snapshot in the current snapshot's history. Can you fix it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044857145


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
+    Preconditions.checkArgument(

Review Comment:
   My question would be do we need to differentiate between a snapshot existed yet had no changes and no such snapshot existed.I think i'm good with the exception because it makes it clear you aren't getting results because of the input parameters and not because of the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044897410


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -285,6 +286,36 @@ public Scan buildChangelogScan() {
 
     Long startSnapshotId = readConf.startSnapshotId();
     Long endSnapshotId = readConf.endSnapshotId();
+    Long startTimestamp = readConf.startTimestamp();
+    Long endTimestamp = readConf.endTimestamp();
+
+    Preconditions.checkArgument(
+        !(startSnapshotId != null && startTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",

Review Comment:
   Made the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044899026


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected changed rows only from snapshot 3",
+        ImmutableList.of(

Review Comment:
   Tried it. We have to change the `_change_ordinal` as well. Need a bit more logic to make it work, it may not be worth considering there only about 3 test cases using that. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044897546


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);

Review Comment:
   Made the change. Thanks for the suggestion.



##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044897001


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -32,6 +32,12 @@ private SparkReadOptions() {}
   // End snapshot ID used in incremental scans (inclusive)
   public static final String END_SNAPSHOT_ID = "end-snapshot-id";
 
+  // Start timestamp used in multi-snapshot scans (exclusive)
+  public static final String START_TIMESTAMP = "start-timestamp";

Review Comment:
   Thanks @szehon-ho and @hililiwei for the input. It'd be nice to consolidate. I kind of prefer the name `start-timestamp` here, so that users don't have to worry about the concept `snapshot`, the query is purely based on the time range.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1044906141


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
+    Preconditions.checkArgument(

Review Comment:
   > do we need to differentiate between a snapshot existed yet had no changes and no such snapshot existed.
   
   I will consider these two use cases.
   1. Query with snapshot id. We should differentiate it, throwing an exception makes sense here.
   2. Query only with time range. We may assume user doesn't have to know the concept of `snapshot`. We may not differentiate it, returning an empty set makes more sense.
   
   Subtle difference though. I am fine with either option.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1038666462


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -32,6 +32,12 @@ private SparkReadOptions() {}
   // End snapshot ID used in incremental scans (inclusive)
   public static final String END_SNAPSHOT_ID = "end-snapshot-id";
 
+  // Start timestamp used in multi-snapshot scans (exclusive)
+  public static final String START_TIMESTAMP = "start-timestamp";
+
+  // End timestamp used in multi-snapshot scans (inclusive)
+  public static final String END_TIMESTAMP = "end-timestamp";

Review Comment:
   The other option here is to reuse `as-of-timestamp` as the end timestamp, but I prefer the symmetric options(start-timestamp, end-timestamp). The UX would be better this way, I assume.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#issuecomment-1351909259

   Thanks @RussellSpitzer. Hi @szehon-ho @hililiwei , please let me know if you have any comments. Thanks1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #6350: Query changelog table with a timestamp range

Posted by GitBox <gi...@apache.org>.
flyrain commented on PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#issuecomment-1352094105

   Thanks @szehon-ho for the review. I believe you are talking about the case 2 in https://github.com/apache/iceberg/pull/6350#discussion_r1044906141. I did try to return an empty set, but it seems a bit involved to make the change. Solutions would be
   1. Add a new interface to interface `IncrementalScan` to return an empty set.
   2. Reuse `IncrementalScan::fromSnapshotExclusive` by passing the latest/current snapshot id. It is a workaround without interface change. I tried this, which needs to touch a bunch of code.
   
   Let me know if there are easier way for that. Otherwise, I will make the change in another PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org