You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/04 03:23:36 UTC

[GitHub] [flink-table-store] zjureel opened a new pull request, #351: [FLINK-28812] Support to read compacted snapshot only

zjureel opened a new pull request, #351:
URL: https://github.com/apache/flink-table-store/pull/351

   Support to read compacted snapshot only


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #351: [FLINK-28812] Support to read compacted snapshot only

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#discussion_r1015169082


##########
flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java:
##########
@@ -296,6 +299,52 @@ public void testWriteWithoutCompaction() throws Exception {
         }
     }
 
+    @Test
+    public void testReadCompactedSnapshot() throws Exception {
+        writeCompactData();
+        FileStoreTable table =
+                createFileStoreTable(conf -> conf.set(CoreOptions.READ_COMPACTED, true));
+
+        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        Snapshot compactedSnapshot = table.snapshotManager().snapshot(plan.snapshotId);
+        Iterator<Snapshot> snapshotIterator = table.snapshotManager().snapshots();
+        while (snapshotIterator.hasNext()) {
+            Snapshot snapshot = snapshotIterator.next();
+            if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                assertThat(snapshot.id()).isLessThanOrEqualTo(compactedSnapshot.id());
+            }
+        }
+
+        assertThat(compactedSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        List<Split> splits = plan.splits();
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING).size())
+                .isGreaterThan(0);
+        assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING).size())
+                .isGreaterThan(0);

Review Comment:
   The latest snapshot is a compacted snapshot, how do you make sure your implementation works (instead of just reading the latest snapshot)? Change this test.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java:
##########
@@ -75,7 +76,8 @@ public AbstractFileStoreScan(
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
-            CoreOptions.ChangelogProducer changelogProducer) {
+            CoreOptions.ChangelogProducer changelogProducer,
+            boolean readCompacted) {

Review Comment:
   I prefer adding a `withReadCompacted(boolean)` method to the scan interface, just like `withIncrement` and other methods. We don't need to pass the parameter all along the way.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java:
##########
@@ -87,6 +89,7 @@ public AbstractFileStoreScan(
         this.numOfBuckets = numOfBuckets;
         this.checkNumOfBuckets = checkNumOfBuckets;
         this.changelogProducer = changelogProducer;
+        this.readCompacted = readCompacted;

Review Comment:
   What happens if user set changelog producer to `INPUT`, and at the same time use `readCompacted`? It seems that we'll scan nothing, which is unfriendly to the user.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,25 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long latestCompactedSnapshotId() {
+        try {
+            Iterator<Snapshot> iterator = snapshots();
+            Long maxCompactedSnapshotId = null;
+            while (iterator.hasNext()) {
+                Snapshot snapshot = iterator.next();
+                if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                    if (maxCompactedSnapshotId == null || snapshot.id() > maxCompactedSnapshotId) {
+                        maxCompactedSnapshotId = snapshot.id();
+                    }
+                }
+            }
+
+            return maxCompactedSnapshotId;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to find latest compacted snapshot id", e);
+        }

Review Comment:
   This implementation is very inefficient with thousands of snapshots. Try iterating backwards from the latest snapshot. When you spot a COMPACT snapshot you can return directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #351: [FLINK-28812] Support to read compacted snapshot only

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #351:
URL: https://github.com/apache/flink-table-store/pull/351


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zjureel commented on pull request #351: [FLINK-28812] Support to read compacted snapshot only

Posted by GitBox <gi...@apache.org>.
zjureel commented on PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#issuecomment-1307061154

   Thanks for @tsreaper 's review, I have updated the codes, can you help to see it when you're free? THX


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zjureel commented on pull request #351: [FLINK-28812] Support to read compacted snapshot only

Posted by GitBox <gi...@apache.org>.
zjureel commented on PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#issuecomment-1311506738

   Thanks @tsreaper I have updated the codes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zjureel commented on pull request #351: [FLINK-28812] Support to read compacted snapshot only

Posted by GitBox <gi...@apache.org>.
zjureel commented on PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#issuecomment-1323369708

   Hi @JingsongLi  I have rebased the codes from master and add test cases of connector, can you help to review them when you're free? THX


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #351: [FLINK-28812] Support to read compacted snapshot only

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#discussion_r1026052672


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -100,6 +100,9 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
                     // Only full compaction results will appear on the max level.
                     plan = scan.withLevel(table.options().numLevels() - 1).plan();
                 } else {
+                    if (!isContinuous) {

Review Comment:
   For full streaming reading, I think we should enable read compacted too.
   In essence, we want to introduce this option to make the read merging more efficient. Full incremental streaming read also requires this capability in the full reading part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on pull request #351: [FLINK-28812] Support to read compacted snapshot only

Posted by GitBox <gi...@apache.org>.
tsreaper commented on PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#issuecomment-1311162542

   Hi @zjureel , please resolve conflicts with master so that I can continue reviewing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #351: [FLINK-28812] Support to read compacted snapshot only

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#discussion_r1026054061


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -100,6 +100,9 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
                     // Only full compaction results will appear on the max level.
                     plan = scan.withLevel(table.options().numLevels() - 1).plan();
                 } else {
+                    if (!isContinuous) {
+                        scan.withReadCompacted(table.options().readCompacted());

Review Comment:
   You have modified connector code, it is better to add cases in `ContinuousFileStoreITCase` and `BatchFileStoreITCase` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #351: [FLINK-28812] Support to read compacted snapshot only

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#discussion_r1026053108


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,27 @@ public boolean snapshotExists(long snapshotId) {
         }
     }
 
+    public @Nullable Long latestCompactedSnapshotId() {
+        Long latestId = latestSnapshotId();
+        Long earliestId = earliestSnapshotId();
+        if (latestId == null || earliestId == null) {
+            return null;
+        }
+
+        long snapshotId = latestId;
+        while (snapshotId >= earliestId) {

Review Comment:
   minor: we can introduce a for loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org