You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/09 02:07:21 UTC

[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6350: Query changelog table with a timestamp range

szehon-ho commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1043984156


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -150,7 +150,7 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
 
   /**
    * Traverses the history of the table's current snapshot and finds the first snapshot committed
-   * after the given time.
+   * after the given time(inclusive).

Review Comment:
   Nit: space before (inclusive)



##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected changed rows only from snapshot 3",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap3.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap3.snapshotId())),
+        changelogRecords(snap2.timestampMillis() + 1, snap3.timestampMillis()));

Review Comment:
   Should we use the mechanism of  TestExpireSnapshot::rightAfterSnapshot ?  I think its a bit more readable. 
    Also, guarantees the snapshot timestamps are not the same (though its unlikely), which may lead to weird failures?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -285,6 +286,36 @@ public Scan buildChangelogScan() {
 
     Long startSnapshotId = readConf.startSnapshotId();
     Long endSnapshotId = readConf.endSnapshotId();
+    Long startTimestamp = readConf.startTimestamp();
+    Long endTimestamp = readConf.endTimestamp();
+
+    Preconditions.checkArgument(
+        !(startSnapshotId != null && startTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",

Review Comment:
   Too many negatives, I'm a bit confused.  Does this capture it?  "Cannot set both %s and %s"



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
+    Preconditions.checkArgument(

Review Comment:
   To me, may be slightly more user friendly to return empty set.  But open.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -32,6 +32,12 @@ private SparkReadOptions() {}
   // End snapshot ID used in incremental scans (inclusive)
   public static final String END_SNAPSHOT_ID = "end-snapshot-id";
 
+  // Start timestamp used in multi-snapshot scans (exclusive)
+  public static final String START_TIMESTAMP = "start-timestamp";

Review Comment:
   Looks like Flink has a config called 'start-snapshot-timestamp'.  Its a bit wordy but wondering if we need to synchronize the two (if they are related):  cc @stevenzwu @pvary @hililiwei 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -285,6 +286,36 @@ public Scan buildChangelogScan() {
 
     Long startSnapshotId = readConf.startSnapshotId();
     Long endSnapshotId = readConf.endSnapshotId();
+    Long startTimestamp = readConf.startTimestamp();
+    Long endTimestamp = readConf.endTimestamp();
+
+    Preconditions.checkArgument(
+        !(startSnapshotId != null && startTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",
+        SparkReadOptions.START_SNAPSHOT_ID,
+        SparkReadOptions.START_TIMESTAMP);
+
+    Preconditions.checkArgument(
+        !(endSnapshotId != null && endTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",
+        SparkReadOptions.END_SNAPSHOT_ID,
+        SparkReadOptions.END_TIMESTAMP);
+
+    if (startTimestamp != null && endTimestamp != null) {
+      Preconditions.checkArgument(
+          startTimestamp < endTimestamp,
+          "Cannot set %s to be greater than %s for changelogs",
+          SparkReadOptions.START_TIMESTAMP,
+          SparkReadOptions.END_TIMESTAMP);
+    }
+
+    if (startTimestamp != null) {
+      startSnapshotId = getStartSnapshotId(startTimestamp);
+    }
+
+    if (endTimestamp != null) {
+      endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
+    }

Review Comment:
   Do we need to add corresponding checks in build() for not setting these new properties (like we check today for start-snapshot-id)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org