You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/24 08:50:59 UTC

[GitHub] [flink-table-store] tsreaper commented on a change in pull request #13: [FLINK-25770] Delete file is not correct in MergeTreeWriter

tsreaper commented on a change in pull request #13:
URL: https://github.com/apache/flink-table-store/pull/13#discussion_r790492629



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
##########
@@ -134,26 +139,36 @@ public void sync() throws Exception {
     }
 
     private Increment drainIncrement() {
+        // drain files to create Increment
         Increment increment =
                 new Increment(
                         new ArrayList<>(newFiles),
-                        new ArrayList<>(compactBefore),
+                        new ArrayList<>(compactBefore.values()),
                         new ArrayList<>(compactAfter));
         newFiles.clear();
         compactBefore.clear();
         compactAfter.clear();
+
+        // return increment
         return increment;

Review comment:
       No need for this comment.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
##########
@@ -112,8 +115,10 @@ private void flush() throws Exception {
             Iterator<KeyValue> iterator = memTable.iterator(keyComparator, accumulator);
             List<SstFileMeta> files =
                     sstFile.write(CloseableIterator.adapterForIterator(iterator), 0);
-            newFiles.addAll(files);
-            files.forEach(levels::addLevel0File);
+            for (SstFileMeta file : files) {
+                newFiles.add(file);
+                levels.addLevel0File(file);
+            }

Review comment:
       No need for this change?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
##########
@@ -134,26 +139,36 @@ public void sync() throws Exception {
     }
 
     private Increment drainIncrement() {
+        // drain files to create Increment

Review comment:
       No need for this comment.

##########
File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
##########
@@ -61,7 +61,12 @@
 import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link MergeTree}. */
+/**
+ * Test for {@link MergeTree}.
+ *
+ * <p>Manual test: please adjust TARGET_FILE_SIZE to 1, so that a large number of upgrade files will
+ * be generated.

Review comment:
       No need for manual test. For `testCloseUpgrade` set write buffer size to 4kb, page size to 1kb and target file size to 1kb. With 1000 records and `@RepeatedTest(10)` to run 10 times it has quite some probability to fail without this fix.

##########
File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
##########
@@ -173,7 +173,11 @@ public void run() {
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
-            writer.close();
+            try {
+                writer.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       Remove `try {.writer.sync() }` above this code fragment because it is included in `writer.close()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org