You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/14 04:03:45 UTC

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #316: [FLINK-29612] Extract changelog files out of DataFileMeta#extraFiles

JingsongLi commented on code in PR #316:
URL: https://github.com/apache/flink-table-store/pull/316#discussion_r995319127


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -190,12 +192,18 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
         Long safeLatestSnapshotId = null;
         List<ManifestEntry> baseEntries = new ArrayList<>();
 
-        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), FileKind.ADD);
-        List<ManifestEntry> compactChanges = new ArrayList<>();
-        compactChanges.addAll(collectChanges(committable.compactBefore(), FileKind.DELETE));
-        compactChanges.addAll(collectChanges(committable.compactAfter(), FileKind.ADD));
-
-        if (createEmptyCommit || !appendChanges.isEmpty()) {
+        List<ManifestEntry> appendMergeTree = new ArrayList<>();

Review Comment:
   No mergetree, `appendDataFiles`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java:
##########
@@ -138,46 +133,31 @@ public void flushMemory() throws Exception {
                 trySyncLatestCompaction(true);
             }
 
-            // write changelog file
-            List<String> extraFiles = new ArrayList<>();
-            if (changelogProducer == ChangelogProducer.INPUT) {
-                SingleFileWriter<KeyValue, Void> writer = writerFactory.createChangelogFileWriter();
-                writer.write(memTable.rawIterator());
-                writer.close();
-                extraFiles.add(writer.path().getName());
-            }
-
             // write lsm level 0 file
-            try {
-                Iterator<KeyValue> iterator = memTable.mergeIterator(keyComparator, mergeFunction);
-                KeyValueDataFileWriter writer = writerFactory.createLevel0Writer();
-                writer.write(iterator);
-                writer.close();
-
-                // In theory, this fileMeta should contain statistics from both lsm file extra file.
-                // However for level 0 files, as we do not drop DELETE records, keys appear in one
-                // file will also appear in the other. So we just need to use statistics from one of
-                // them.
-                //
-                // For value count merge function, it is possible that we have changelog first
-                // adding one record then remove one record, but after merging this record will not
-                // appear in lsm file. This is OK because we can also skip this changelog.
-                DataFileMeta fileMeta = writer.result();
-                if (fileMeta == null) {
-                    for (String extraFile : extraFiles) {
-                        writerFactory.deleteFile(extraFile);
+            Iterator<KeyValue> iterator = memTable.mergeIterator(keyComparator, mergeFunction);
+            KeyValueDataFileWriter writer = writerFactory.createMergeTreeFileWriter(0);
+            writer.write(iterator);
+            writer.close();
+            DataFileMeta fileMeta = writer.result();
+
+            if (fileMeta != null) {
+                newFiles.add(fileMeta);
+                compactManager.addNewFile(fileMeta);
+
+                // write changelog file
+                if (changelogProducer == ChangelogProducer.INPUT) {
+                    try {
+                        KeyValueDataFileWriter changelogWriter =
+                                writerFactory.createChangelogFileWriter(0);
+                        changelogWriter.write(memTable.rawIterator());
+                        changelogWriter.close();
+                        changelogFiles.add(changelogWriter.result());
+                    } catch (Exception e) {
+                        // exception occurs, clean up written file
+                        writerFactory.deleteFile(fileMeta.fileName());

Review Comment:
   no need to delete file, already in `newFiles`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org