You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/13 04:40:55 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request, #316: [FLINK-29612] Extract changelog files out of DataFileMeta#extraFiles

tsreaper opened a new pull request, #316:
URL: https://github.com/apache/flink-table-store/pull/316

   Currently changelog files are stored as extra files in `DataFileMeta`. However for the full compaction changelog we're about to introduce, it cannot be added as extra files because their statistics might be different from the corresponding merge tree files.
   
   We need to extract changelog files out of `DataFileMeta#extraFiles`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #316: [FLINK-29612] Extract changelog files out of DataFileMeta#extraFiles

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #316:
URL: https://github.com/apache/flink-table-store/pull/316#discussion_r995324401


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java:
##########
@@ -138,46 +133,31 @@ public void flushMemory() throws Exception {
                 trySyncLatestCompaction(true);
             }
 
-            // write changelog file
-            List<String> extraFiles = new ArrayList<>();
-            if (changelogProducer == ChangelogProducer.INPUT) {
-                SingleFileWriter<KeyValue, Void> writer = writerFactory.createChangelogFileWriter();
-                writer.write(memTable.rawIterator());
-                writer.close();
-                extraFiles.add(writer.path().getName());
-            }
-
             // write lsm level 0 file
-            try {
-                Iterator<KeyValue> iterator = memTable.mergeIterator(keyComparator, mergeFunction);
-                KeyValueDataFileWriter writer = writerFactory.createLevel0Writer();
-                writer.write(iterator);
-                writer.close();
-
-                // In theory, this fileMeta should contain statistics from both lsm file extra file.
-                // However for level 0 files, as we do not drop DELETE records, keys appear in one
-                // file will also appear in the other. So we just need to use statistics from one of
-                // them.
-                //
-                // For value count merge function, it is possible that we have changelog first
-                // adding one record then remove one record, but after merging this record will not
-                // appear in lsm file. This is OK because we can also skip this changelog.
-                DataFileMeta fileMeta = writer.result();
-                if (fileMeta == null) {
-                    for (String extraFile : extraFiles) {
-                        writerFactory.deleteFile(extraFile);
+            Iterator<KeyValue> iterator = memTable.mergeIterator(keyComparator, mergeFunction);

Review Comment:
   Writing data first will sort the data in the writer buffer, which will make the changelog different from the input order.
   
   But it may not be a bad thing, because in https://github.com/apache/flink-table-store/pull/315 It is impossible to maintain the input order.
   
   It is better to note the following sequence here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper merged pull request #316: [FLINK-29612] Extract changelog files out of DataFileMeta#extraFiles

Posted by GitBox <gi...@apache.org>.
tsreaper merged PR #316:
URL: https://github.com/apache/flink-table-store/pull/316


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #316: [FLINK-29612] Extract changelog files out of DataFileMeta#extraFiles

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #316:
URL: https://github.com/apache/flink-table-store/pull/316#discussion_r995319127


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -190,12 +192,18 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
         Long safeLatestSnapshotId = null;
         List<ManifestEntry> baseEntries = new ArrayList<>();
 
-        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), FileKind.ADD);
-        List<ManifestEntry> compactChanges = new ArrayList<>();
-        compactChanges.addAll(collectChanges(committable.compactBefore(), FileKind.DELETE));
-        compactChanges.addAll(collectChanges(committable.compactAfter(), FileKind.ADD));
-
-        if (createEmptyCommit || !appendChanges.isEmpty()) {
+        List<ManifestEntry> appendMergeTree = new ArrayList<>();

Review Comment:
   No mergetree, `appendDataFiles`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java:
##########
@@ -138,46 +133,31 @@ public void flushMemory() throws Exception {
                 trySyncLatestCompaction(true);
             }
 
-            // write changelog file
-            List<String> extraFiles = new ArrayList<>();
-            if (changelogProducer == ChangelogProducer.INPUT) {
-                SingleFileWriter<KeyValue, Void> writer = writerFactory.createChangelogFileWriter();
-                writer.write(memTable.rawIterator());
-                writer.close();
-                extraFiles.add(writer.path().getName());
-            }
-
             // write lsm level 0 file
-            try {
-                Iterator<KeyValue> iterator = memTable.mergeIterator(keyComparator, mergeFunction);
-                KeyValueDataFileWriter writer = writerFactory.createLevel0Writer();
-                writer.write(iterator);
-                writer.close();
-
-                // In theory, this fileMeta should contain statistics from both lsm file extra file.
-                // However for level 0 files, as we do not drop DELETE records, keys appear in one
-                // file will also appear in the other. So we just need to use statistics from one of
-                // them.
-                //
-                // For value count merge function, it is possible that we have changelog first
-                // adding one record then remove one record, but after merging this record will not
-                // appear in lsm file. This is OK because we can also skip this changelog.
-                DataFileMeta fileMeta = writer.result();
-                if (fileMeta == null) {
-                    for (String extraFile : extraFiles) {
-                        writerFactory.deleteFile(extraFile);
+            Iterator<KeyValue> iterator = memTable.mergeIterator(keyComparator, mergeFunction);
+            KeyValueDataFileWriter writer = writerFactory.createMergeTreeFileWriter(0);
+            writer.write(iterator);
+            writer.close();
+            DataFileMeta fileMeta = writer.result();
+
+            if (fileMeta != null) {
+                newFiles.add(fileMeta);
+                compactManager.addNewFile(fileMeta);
+
+                // write changelog file
+                if (changelogProducer == ChangelogProducer.INPUT) {
+                    try {
+                        KeyValueDataFileWriter changelogWriter =
+                                writerFactory.createChangelogFileWriter(0);
+                        changelogWriter.write(memTable.rawIterator());
+                        changelogWriter.close();
+                        changelogFiles.add(changelogWriter.result());
+                    } catch (Exception e) {
+                        // exception occurs, clean up written file
+                        writerFactory.deleteFile(fileMeta.fileName());

Review Comment:
   no need to delete file, already in `newFiles`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #316: [FLINK-29612] Extract changelog files out of DataFileMeta#extraFiles

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #316:
URL: https://github.com/apache/flink-table-store/pull/316#discussion_r995325790


##########
flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java:
##########
@@ -189,6 +192,80 @@ public void testStreamingChangelog() throws Exception {
                                 "+U 1|10|102|binary|varbinary"));
     }
 
+    @Test
+    public void testStreamingChangelogCompatibility02() throws Exception {
+        // already contains 2 commits
+        CompatibilityTestUtils.unzip("compatibility/0.2-changelog-table.zip", tablePath.getPath());

Review Comment:
   rename to `table-changelog-0.2.zip`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #316: [FLINK-29612] Extract changelog files out of DataFileMeta#extraFiles

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #316:
URL: https://github.com/apache/flink-table-store/pull/316#discussion_r995436069


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -422,25 +490,36 @@ private boolean tryCommitOnce(
             previousChangesListName = manifestList.write(newMetas);
 
             // write new changes into manifest files
-            List<ManifestFileMeta> newChangesManifests = manifestFile.write(changes);
+            List<ManifestFileMeta> newChangesManifests = manifestFile.write(tableFiles);
             newMetas.addAll(newChangesManifests);
             newChangesListName = manifestList.write(newChangesManifests);
 
+            // write changelog into manifest files
+            changelogMetas.addAll(manifestFile.write(changelogFiles));

Review Comment:
   Can we avoid generating redundant files without changelog?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org