You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/10 15:30:47 UTC

[GitHub] [iceberg] hililiwei opened a new pull request, #6401: Flink: Change to oldestAncestorAfter

hililiwei opened a new pull request, #6401:
URL: https://github.com/apache/iceberg/pull/6401

   address https://github.com/apache/iceberg/pull/6350#discussion_r1044901322


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#discussion_r1066580668


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java:
##########
@@ -213,17 +213,12 @@ static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
             "Start snapshot id not found in history: " + scanContext.startSnapshotId());
         return Optional.of(matchedSnapshotById);
       case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
-        long snapshotIdAsOfTime =
-            SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
-        Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
-        if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp()) {
-          return Optional.of(matchedSnapshotByTimestamp);
-        } else {
-          // if the snapshotIdAsOfTime has the timestamp value smaller than the
-          // scanContext.startSnapshotTimestamp(),
-          // return the child snapshot whose timestamp value is larger
-          return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
-        }
+        Snapshot matchedSnapshotByTimestamp =
+            SnapshotUtil.oldestAncestorAfter(table, scanContext.startSnapshotTimestamp());
+        Preconditions.checkArgument(
+            matchedSnapshotByTimestamp != null,
+            "Cannot find a snapshot after: " + scanContext.startSnapshotTimestamp());
+        return Optional.of(matchedSnapshotByTimestamp);

Review Comment:
   This fix seems correct. 
   
   Old logic actually has a little problem, because it checks `List<HistoryEntry>` (not the ancestors of the current table snapshot).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#discussion_r1068927544


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -459,7 +459,7 @@ public void testIncrementalFromSnapshotTimestampWithInvalidIds() throws Exceptio
     AssertHelpers.assertThrows(
         "Should detect invalid starting snapshot timestamp",
         IllegalArgumentException.class,
-        "Cannot find a snapshot older than " + invalidSnapshotTimestampMsStr,
+        "Cannot find a snapshot after: ",

Review Comment:
   nit: we can remove the variable def in line 446 above, as it is not used anymore after this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#discussion_r1056284703


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java:
##########
@@ -213,17 +213,12 @@ static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
             "Start snapshot id not found in history: " + scanContext.startSnapshotId());
         return Optional.of(matchedSnapshotById);
       case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
-        long snapshotIdAsOfTime =
-            SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
-        Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
-        if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp()) {
-          return Optional.of(matchedSnapshotByTimestamp);
-        } else {
-          // if the snapshotIdAsOfTime has the timestamp value smaller than the
-          // scanContext.startSnapshotTimestamp(),
-          // return the child snapshot whose timestamp value is larger
-          return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
-        }
+        Snapshot matchedSnapshotByTimestamp =
+            SnapshotUtil.oldestAncestorAfter(table, scanContext.startSnapshotTimestamp());
+        Preconditions.checkArgument(
+            matchedSnapshotByTimestamp != null,
+            "Cannot find a snapshot after: " + scanContext.startSnapshotTimestamp());
+        return Optional.of(matchedSnapshotByTimestamp);

Review Comment:
   The old one was to first find a snapshot equal to the `scanContext.startSnapshotTimestamp()`. If not, look for a snapshot older than `scanContext.startSnapshotTimestamp()`. The logic is consistent with `oldestAncestorAfter`.
   It come from https://github.com/apache/iceberg/pull/6350#discussion_r1044901322



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#discussion_r1068927544


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -459,7 +459,7 @@ public void testIncrementalFromSnapshotTimestampWithInvalidIds() throws Exceptio
     AssertHelpers.assertThrows(
         "Should detect invalid starting snapshot timestamp",
         IllegalArgumentException.class,
-        "Cannot find a snapshot older than " + invalidSnapshotTimestampMsStr,
+        "Cannot find a snapshot after: ",

Review Comment:
   nit: we can remove the variable defined in line 446 above, as it is not used anymore after this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
stevenzwu merged PR #6401:
URL: https://github.com/apache/iceberg/pull/6401


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#discussion_r1066582042


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java:
##########
@@ -213,17 +213,12 @@ static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
             "Start snapshot id not found in history: " + scanContext.startSnapshotId());
         return Optional.of(matchedSnapshotById);
       case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
-        long snapshotIdAsOfTime =
-            SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
-        Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
-        if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp()) {
-          return Optional.of(matchedSnapshotByTimestamp);
-        } else {
-          // if the snapshotIdAsOfTime has the timestamp value smaller than the
-          // scanContext.startSnapshotTimestamp(),
-          // return the child snapshot whose timestamp value is larger
-          return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
-        }
+        Snapshot matchedSnapshotByTimestamp =
+            SnapshotUtil.oldestAncestorAfter(table, scanContext.startSnapshotTimestamp());
+        Preconditions.checkArgument(

Review Comment:
   nit: this seems redundant as `SnapshotUtil.oldestAncestorAfter` already throws an exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#discussion_r1066580668


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java:
##########
@@ -213,17 +213,12 @@ static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
             "Start snapshot id not found in history: " + scanContext.startSnapshotId());
         return Optional.of(matchedSnapshotById);
       case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
-        long snapshotIdAsOfTime =
-            SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
-        Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
-        if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp()) {
-          return Optional.of(matchedSnapshotByTimestamp);
-        } else {
-          // if the snapshotIdAsOfTime has the timestamp value smaller than the
-          // scanContext.startSnapshotTimestamp(),
-          // return the child snapshot whose timestamp value is larger
-          return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
-        }
+        Snapshot matchedSnapshotByTimestamp =
+            SnapshotUtil.oldestAncestorAfter(table, scanContext.startSnapshotTimestamp());
+        Preconditions.checkArgument(
+            matchedSnapshotByTimestamp != null,
+            "Cannot find a snapshot after: " + scanContext.startSnapshotTimestamp());
+        return Optional.of(matchedSnapshotByTimestamp);

Review Comment:
   This fix seems correct. 
   
   Old logic actually has a little problem, because it checks `List<HistoryEntry>` (not the ancestor history of the current table snapshot).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#issuecomment-1382149280

   thanks @hililiwei for the contribution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#discussion_r1051685128


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java:
##########
@@ -213,17 +213,12 @@ static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
             "Start snapshot id not found in history: " + scanContext.startSnapshotId());
         return Optional.of(matchedSnapshotById);
       case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
-        long snapshotIdAsOfTime =
-            SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
-        Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
-        if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp()) {
-          return Optional.of(matchedSnapshotByTimestamp);
-        } else {
-          // if the snapshotIdAsOfTime has the timestamp value smaller than the
-          // scanContext.startSnapshotTimestamp(),
-          // return the child snapshot whose timestamp value is larger
-          return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
-        }
+        Snapshot matchedSnapshotByTimestamp =
+            SnapshotUtil.oldestAncestorAfter(table, scanContext.startSnapshotTimestamp());
+        Preconditions.checkArgument(
+            matchedSnapshotByTimestamp != null,
+            "Cannot find a snapshot after: " + scanContext.startSnapshotTimestamp());
+        return Optional.of(matchedSnapshotByTimestamp);

Review Comment:
   @stevenzwu FYI.
   
   Isn't this a behavior change? I don't think we should make a behavior change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
hililiwei commented on PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#issuecomment-1383339946

   thanks @stevenzwu @rdblue @PraveenNanda124 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6401: Flink: Change to oldestAncestorAfter

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6401:
URL: https://github.com/apache/iceberg/pull/6401#discussion_r1066631675


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java:
##########
@@ -213,17 +213,12 @@ static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
             "Start snapshot id not found in history: " + scanContext.startSnapshotId());
         return Optional.of(matchedSnapshotById);
       case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
-        long snapshotIdAsOfTime =
-            SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
-        Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
-        if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp()) {
-          return Optional.of(matchedSnapshotByTimestamp);
-        } else {
-          // if the snapshotIdAsOfTime has the timestamp value smaller than the
-          // scanContext.startSnapshotTimestamp(),
-          // return the child snapshot whose timestamp value is larger
-          return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
-        }
+        Snapshot matchedSnapshotByTimestamp =
+            SnapshotUtil.oldestAncestorAfter(table, scanContext.startSnapshotTimestamp());
+        Preconditions.checkArgument(

Review Comment:
   ```
       appendTwoSnapshots();
   
       long invalidSnapshotTimestampMs = snapshot2.timestampMillis() + 1000L;
   ```
   It returns null when using a timestamp larger than the latest snapshot of the table or there are no snapshots.
   UT fail for this reason.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org