You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/29 13:18:13 UTC

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #178: [FLINK-28244] Introduce changelog file for DataFile

JingsongLi commented on code in PR #178:
URL: https://github.com/apache/flink-table-store/pull/178#discussion_r909620472


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##########
@@ -105,32 +106,60 @@ public DataFilePathFactory pathFactory() {
         return pathFactory;
     }
 
+    /** Write raw {@link KeyValue} iterator into a changelog file. */
+    public Path writeLevel0Changelog(Iterator<KeyValue> iterator) throws Exception {
+        FileWriter.Factory<KeyValue, Metric> writerFactory = createFileWriterFactory();
+        Path changelogPath = pathFactory.newChangelogPath();
+        doWrite(writerFactory.create(changelogPath), iterator);
+        return changelogPath;
+    }
+
+    /**
+     * Write several {@link KeyValue}s into a data file of level 0.
+     *
+     * @return empty if iterator is empty
+     */
+    public Optional<DataFileMeta> writeLevel0(Iterator<KeyValue> iterator) throws Exception {
+        List<DataFileMeta> files = write(iterator, 0);
+        if (files.size() > 1) {
+            throw new RuntimeException("Produce illegal multiple Level 0 files: " + files);
+        }
+        return files.size() == 0 ? Optional.empty() : Optional.of(files.get(0));
+    }
+
     /**
-     * Write several {@link KeyValue}s into a data file of a given level.
+     * Write several {@link KeyValue}s into data files of a given level.
      *
      * <p>NOTE: This method is atomic.
      */
-    public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
-            throws Exception {
+    public List<DataFileMeta> write(Iterator<KeyValue> iterator, int level) throws Exception {
+        // Don't roll file for level 0
+        long suggestedFileSize = level == 0 ? Long.MAX_VALUE : this.suggestedFileSize;
+        return doWrite(createRollingKvWriter(level, suggestedFileSize), iterator);
+    }
 
-        RollingKvWriter rollingKvWriter = createRollingKvWriter(level, suggestedFileSize);
-        try (RollingKvWriter writer = rollingKvWriter) {
+    private <R> R doWrite(FileWriter<KeyValue, R> fileWriter, Iterator<KeyValue> iterator)
+            throws Exception {
+        try (FileWriter<KeyValue, R> writer = fileWriter) {
             writer.write(iterator);
-
         } catch (Throwable e) {
             LOG.warn("Exception occurs when writing data files. Cleaning up.", e);
-
-            rollingKvWriter.abort();
+            fileWriter.abort();
             throw e;
         } finally {
-            iterator.close();
+            if (iterator instanceof AutoCloseable) {
+                ((AutoCloseable) iterator).close();

Review Comment:
   Yes, `ClosableIterator` is a `Iterator` with `AutoCloseable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org