You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/02 06:45:14 UTC

[flink-table-store] branch master updated: [FLINK-26442] Allow ManifestFile to write a list of ManifestFileMeta into multiple files if too many

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new b980af0  [FLINK-26442] Allow ManifestFile to write a list of ManifestFileMeta into multiple files if too many
b980af0 is described below

commit b980af00017a8da585882009ce629866e865f2ae
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Mar 2 14:45:10 2022 +0800

    [FLINK-26442] Allow ManifestFile to write a list of ManifestFileMeta into multiple files if too many
    
    This closes #29
---
 .../flink/table/store/file/FileStoreImpl.java      |   7 +-
 .../table/store/file/manifest/ManifestFile.java    | 113 ++++++++++++-----
 .../store/file/manifest/ManifestFileMeta.java      |  65 +++++-----
 .../store/file/mergetree/sst/SstFileWriter.java    | 133 ++++++++++-----------
 .../store/file/operation/FileStoreCommitImpl.java  |   5 +-
 .../flink/table/store/file/utils/RollingFile.java  | 125 +++++++++++++++++++
 .../store/file/manifest/ManifestFileMetaTest.java  |  77 ++++++------
 .../store/file/manifest/ManifestFileTest.java      |  46 +++++--
 .../store/file/mergetree/sst/SstFileTest.java      |  32 -----
 .../table/store/file/stats/StatsTestUtils.java     |  58 +++++++++
 10 files changed, 436 insertions(+), 225 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index 2fa5c48..3249c35 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -80,7 +80,12 @@ public class FileStoreImpl implements FileStore {
 
     private ManifestFile.Factory manifestFileFactory() {
         return new ManifestFile.Factory(
-                partitionType, keyType, valueType, options.manifestFormat(), pathFactory());
+                partitionType,
+                keyType,
+                valueType,
+                options.manifestFormat(),
+                pathFactory(),
+                options.manifestTargetSize().getBytes());
     }
 
     @VisibleForTesting
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 77b3ba7..6382817 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -18,21 +18,26 @@
 
 package org.apache.flink.table.store.file.manifest;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.stats.FieldStatsCollector;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RollingFile;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -41,23 +46,33 @@ import java.util.List;
  */
 public class ManifestFile {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class);
+
     private final RowType partitionType;
     private final ManifestEntrySerializer serializer;
     private final BulkFormat<RowData, FileSourceSplit> readerFactory;
     private final BulkWriter.Factory<RowData> writerFactory;
     private final FileStorePathFactory pathFactory;
+    private final long suggestedFileSize;
 
     private ManifestFile(
             RowType partitionType,
             ManifestEntrySerializer serializer,
             BulkFormat<RowData, FileSourceSplit> readerFactory,
             BulkWriter.Factory<RowData> writerFactory,
-            FileStorePathFactory pathFactory) {
+            FileStorePathFactory pathFactory,
+            long suggestedFileSize) {
         this.partitionType = partitionType;
         this.serializer = serializer;
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.pathFactory = pathFactory;
+        this.suggestedFileSize = suggestedFileSize;
+    }
+
+    @VisibleForTesting
+    public long suggestedFileSize() {
+        return suggestedFileSize;
     }
 
     public List<ManifestEntry> read(String fileName) {
@@ -70,34 +85,56 @@ public class ManifestFile {
     }
 
     /**
-     * Write several {@link ManifestEntry}s into a manifest file.
+     * Write several {@link ManifestEntry}s into manifest files.
      *
      * <p>NOTE: This method is atomic.
      */
-    public ManifestFileMeta write(List<ManifestEntry> entries) {
+    public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
         Preconditions.checkArgument(
                 entries.size() > 0, "Manifest entries to write must not be empty.");
 
-        Path path = pathFactory.newManifestFile();
+        ManifestRollingFile rollingFile = new ManifestRollingFile();
+        List<ManifestFileMeta> result = new ArrayList<>();
+        List<Path> filesToCleanUp = new ArrayList<>();
         try {
-            return write(entries, path);
+            rollingFile.write(entries.iterator(), result, filesToCleanUp);
         } catch (Throwable e) {
-            FileUtils.deleteOrWarn(path);
-            throw new RuntimeException(
-                    "Exception occurs when writing manifest file " + path + ". Clean up.", e);
+            LOG.warn("Exception occurs when writing manifest files. Cleaning up.", e);
+            for (Path path : filesToCleanUp) {
+                FileUtils.deleteOrWarn(path);
+            }
+            throw new RuntimeException(e);
         }
+        return result;
+    }
+
+    public void delete(String fileName) {
+        FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
     }
 
-    private ManifestFileMeta write(List<ManifestEntry> entries, Path path) throws IOException {
-        FSDataOutputStream out =
-                path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE);
-        BulkWriter<RowData> writer = writerFactory.create(out);
-        long numAddedFiles = 0;
-        long numDeletedFiles = 0;
-        FieldStatsCollector statsCollector = new FieldStatsCollector(partitionType);
+    private class ManifestRollingFile extends RollingFile<ManifestEntry, ManifestFileMeta> {
+
+        private long numAddedFiles;
+        private long numDeletedFiles;
+        private FieldStatsCollector statsCollector;
+
+        private ManifestRollingFile() {
+            super(suggestedFileSize);
+            resetMeta();
+        }
 
-        for (ManifestEntry entry : entries) {
-            writer.addElement(serializer.toRow(entry));
+        @Override
+        protected Path newPath() {
+            return pathFactory.newManifestFile();
+        }
+
+        @Override
+        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
+            return writerFactory.create(out);
+        }
+
+        @Override
+        protected RowData toRowData(ManifestEntry entry) {
             switch (entry.kind()) {
                 case ADD:
                     numAddedFiles++;
@@ -107,20 +144,28 @@ public class ManifestFile {
                     break;
             }
             statsCollector.collect(entry.partition());
+
+            return serializer.toRow(entry);
         }
-        writer.finish();
-        out.close();
-
-        return new ManifestFileMeta(
-                path.getName(),
-                path.getFileSystem().getFileStatus(path).getLen(),
-                numAddedFiles,
-                numDeletedFiles,
-                statsCollector.extract());
-    }
 
-    public void delete(String fileName) {
-        FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
+        @Override
+        protected ManifestFileMeta collectFile(Path path) throws IOException {
+            ManifestFileMeta result =
+                    new ManifestFileMeta(
+                            path.getName(),
+                            path.getFileSystem().getFileStatus(path).getLen(),
+                            numAddedFiles,
+                            numDeletedFiles,
+                            statsCollector.extract());
+            resetMeta();
+            return result;
+        }
+
+        private void resetMeta() {
+            numAddedFiles = 0;
+            numDeletedFiles = 0;
+            statsCollector = new FieldStatsCollector(partitionType);
+        }
     }
 
     /**
@@ -135,13 +180,15 @@ public class ManifestFile {
         private final BulkFormat<RowData, FileSourceSplit> readerFactory;
         private final BulkWriter.Factory<RowData> writerFactory;
         private final FileStorePathFactory pathFactory;
+        private final long suggestedFileSize;
 
         public Factory(
                 RowType partitionType,
                 RowType keyType,
                 RowType rowType,
                 FileFormat fileFormat,
-                FileStorePathFactory pathFactory) {
+                FileStorePathFactory pathFactory,
+                long suggestedFileSize) {
             this.partitionType = partitionType;
             this.keyType = keyType;
             this.rowType = rowType;
@@ -149,6 +196,7 @@ public class ManifestFile {
             this.readerFactory = fileFormat.createReaderFactory(entryType);
             this.writerFactory = fileFormat.createWriterFactory(entryType);
             this.pathFactory = pathFactory;
+            this.suggestedFileSize = suggestedFileSize;
         }
 
         public ManifestFile create() {
@@ -157,7 +205,8 @@ public class ManifestFile {
                     new ManifestEntrySerializer(partitionType, keyType, rowType),
                     readerFactory,
                     writerFactory,
-                    pathFactory);
+                    pathFactory,
+                    suggestedFileSize);
         }
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index be1d629..4ee1c6d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -27,12 +27,10 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 
 /** Metadata of a manifest file. */
 public class ManifestFileMeta {
@@ -125,56 +123,40 @@ public class ManifestFileMeta {
     }
 
     /**
-     * Merge several {@link ManifestFileMeta}s with several {@link ManifestEntry}s. {@link
-     * ManifestEntry}s representing first adding and then deleting the same sst file will cancel
-     * each other.
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
+     * then deleting the same sst file will cancel each other.
      *
      * <p>NOTE: This method is atomic.
      */
     public static List<ManifestFileMeta> merge(
             List<ManifestFileMeta> metas,
-            List<ManifestEntry> entries,
             ManifestFile manifestFile,
             long suggestedMetaSize,
             int suggestedMinMetaCount) {
         List<ManifestFileMeta> result = new ArrayList<>();
         // these are the newly created manifest files, clean them up if exception occurs
         List<ManifestFileMeta> newMetas = new ArrayList<>();
-        List<ManifestFileMeta> candidate = new ArrayList<>();
+        List<ManifestFileMeta> candidates = new ArrayList<>();
         long totalSize = 0;
-        int metaCount = 0;
 
         try {
             // merge existing manifests first
             for (ManifestFileMeta manifest : metas) {
                 totalSize += manifest.fileSize;
-                metaCount += 1;
-                candidate.add(manifest);
-                if (totalSize >= suggestedMetaSize || metaCount >= suggestedMinMetaCount) {
+                candidates.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
                     // reach suggested file size, perform merging and produce new file
-                    if (candidate.size() == 1) {
-                        result.add(candidate.get(0));
-                    } else {
-                        mergeIntoOneFile(candidate, Collections.emptyList(), manifestFile)
-                                .ifPresent(
-                                        merged -> {
-                                            newMetas.add(merged);
-                                            result.add(merged);
-                                        });
-                    }
-
-                    candidate.clear();
+                    mergeCandidates(candidates, manifestFile, result, newMetas);
+                    candidates.clear();
                     totalSize = 0;
-                    metaCount = 0;
                 }
             }
 
-            // both size and count conditions not satisfied, create new file from entries
-            result.addAll(candidate);
-            if (entries.size() > 0) {
-                ManifestFileMeta newManifestFileMeta = manifestFile.write(entries);
-                newMetas.add(newManifestFileMeta);
-                result.add(newManifestFileMeta);
+            // merge the last bit of manifests if there are too many
+            if (candidates.size() >= suggestedMinMetaCount) {
+                mergeCandidates(candidates, manifestFile, result, newMetas);
+            } else {
+                result.addAll(candidates);
             }
         } catch (Throwable e) {
             // exception occurs, clean up and rethrow
@@ -187,16 +169,25 @@ public class ManifestFileMeta {
         return result;
     }
 
-    private static Optional<ManifestFileMeta> mergeIntoOneFile(
-            List<ManifestFileMeta> metas, List<ManifestEntry> entries, ManifestFile manifestFile) {
+    private static void mergeCandidates(
+            List<ManifestFileMeta> candidates,
+            ManifestFile manifestFile,
+            List<ManifestFileMeta> result,
+            List<ManifestFileMeta> newMetas) {
+        if (candidates.size() == 1) {
+            result.add(candidates.get(0));
+            return;
+        }
+
         Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : metas) {
+        for (ManifestFileMeta manifest : candidates) {
             mergeEntries(manifestFile.read(manifest.fileName), map);
         }
-        mergeEntries(entries, map);
-        return map.isEmpty()
-                ? Optional.empty()
-                : Optional.of(manifestFile.write(new ArrayList<>(map.values())));
+        if (!map.isEmpty()) {
+            List<ManifestFileMeta> merged = manifestFile.write(new ArrayList<>(map.values()));
+            result.addAll(merged);
+            newMetas.addAll(merged);
+        }
     }
 
     private static void mergeEntries(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
index 91e7ae3..5c18aa6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.mergetree.sst;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
@@ -32,6 +31,7 @@ import org.apache.flink.table.store.file.KeyValueSerializer;
 import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RollingFile;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.CloseableIterator;
 
@@ -91,40 +91,20 @@ public class SstFileWriter {
      */
     public List<SstFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
             throws Exception {
+        SstRollingFile rollingFile = new StatsCollectingRollingFile(level);
         List<SstFileMeta> result = new ArrayList<>();
-
-        RollingFile rollingFile = null;
-        Path currentPath = null;
+        List<Path> filesToCleanUp = new ArrayList<>();
         try {
-            while (iterator.hasNext()) {
-                if (rollingFile == null) {
-                    currentPath = pathFactory.newPath();
-                    rollingFile = new RollingFile(currentPath, suggestedFileSize);
-                }
-                rollingFile.write(iterator.next());
-                if (rollingFile.exceedsSuggestedFileSize()) {
-                    result.add(rollingFile.finish(level));
-                    rollingFile = null;
-                }
-            }
-            // finish last file
-            if (rollingFile != null) {
-                result.add(rollingFile.finish(level));
-            }
-            iterator.close();
+            rollingFile.write(iterator, result, filesToCleanUp);
         } catch (Throwable e) {
             LOG.warn("Exception occurs when writing sst files. Cleaning up.", e);
-            // clean up finished files
-            for (SstFileMeta meta : result) {
-                FileUtils.deleteOrWarn(pathFactory.toPath(meta.fileName()));
-            }
-            // clean up in-progress file
-            if (currentPath != null) {
-                FileUtils.deleteOrWarn(currentPath);
+            for (Path path : filesToCleanUp) {
+                FileUtils.deleteOrWarn(path);
             }
             throw e;
+        } finally {
+            iterator.close();
         }
-
         return result;
     }
 
@@ -132,12 +112,9 @@ public class SstFileWriter {
         FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
     }
 
-    private class RollingFile {
-        private final Path path;
-        private final long suggestedFileSize;
+    private abstract class SstRollingFile extends RollingFile<KeyValue, SstFileMeta> {
 
-        private final FSDataOutputStream out;
-        private final BulkWriter<RowData> writer;
+        private final int level;
         private final KeyValueSerializer serializer;
         private final RowDataSerializer keySerializer;
 
@@ -147,37 +124,30 @@ public class SstFileWriter {
         private long minSequenceNumber;
         private long maxSequenceNumber;
 
-        private RollingFile(Path path, long suggestedFileSize) throws IOException {
-            this.path = path;
-            this.suggestedFileSize = suggestedFileSize;
-
-            this.out =
-                    this.path.getFileSystem().create(this.path, FileSystem.WriteMode.NO_OVERWRITE);
-            this.writer = writerFactory.create(out);
+        private SstRollingFile(int level) {
+            super(suggestedFileSize);
+            this.level = level;
             this.serializer = new KeyValueSerializer(keyType, valueType);
             this.keySerializer = new RowDataSerializer(keyType);
+            resetMeta();
+        }
 
-            this.rowCount = 0;
-            this.minKey = null;
-            this.maxKey = null;
-            this.minSequenceNumber = Long.MAX_VALUE;
-            this.maxSequenceNumber = Long.MIN_VALUE;
+        @Override
+        protected Path newPath() {
+            return pathFactory.newPath();
+        }
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Creating new sst file " + path);
-            }
+        @Override
+        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException {
+            return writerFactory.create(out);
         }
 
-        private void write(KeyValue kv) throws IOException {
+        @Override
+        protected RowData toRowData(KeyValue kv) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "Writing key-value to sst file "
-                                + path
-                                + ", kv: "
-                                + kv.toString(keyType, valueType));
+                LOG.debug("Writing key-value to sst file, kv: " + kv.toString(keyType, valueType));
             }
 
-            writer.addElement(serializer.toRow(kv));
             rowCount++;
             if (minKey == null) {
                 minKey = keySerializer.toBinaryRow(kv.key()).copy();
@@ -185,17 +155,46 @@ public class SstFileWriter {
             maxKey = kv.key();
             minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
             maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
+
+            return serializer.toRow(kv);
         }
 
-        private boolean exceedsSuggestedFileSize() throws IOException {
-            // NOTE: this method is inaccurate for formats buffering changes in memory
-            return out.getPos() >= suggestedFileSize;
+        @Override
+        protected SstFileMeta collectFile(Path path) throws IOException {
+            SstFileMeta result =
+                    new SstFileMeta(
+                            path.getName(),
+                            FileUtils.getFileSize(path),
+                            rowCount,
+                            minKey,
+                            keySerializer.toBinaryRow(maxKey).copy(),
+                            collectStats(path),
+                            minSequenceNumber,
+                            maxSequenceNumber,
+                            level);
+            resetMeta();
+            return result;
         }
 
-        private SstFileMeta finish(int level) throws IOException {
-            writer.finish();
-            out.close();
+        private void resetMeta() {
+            rowCount = 0;
+            minKey = null;
+            maxKey = null;
+            minSequenceNumber = Long.MAX_VALUE;
+            maxSequenceNumber = Long.MIN_VALUE;
+        }
 
+        protected abstract FieldStats[] collectStats(Path path);
+    }
+
+    private class StatsCollectingRollingFile extends SstRollingFile {
+
+        private StatsCollectingRollingFile(int level) {
+            super(level);
+        }
+
+        @Override
+        protected FieldStats[] collectStats(Path path) {
             // TODO
             //  1. Read statistics directly from the written orc/parquet files.
             //  2. For other file formats use StatsCollector. Make sure fields are not reused
@@ -204,17 +203,7 @@ public class SstFileWriter {
             for (int i = 0; i < stats.length; i++) {
                 stats[i] = new FieldStats(null, null, 0);
             }
-
-            return new SstFileMeta(
-                    path.getName(),
-                    FileUtils.getFileSize(path),
-                    rowCount,
-                    minKey,
-                    keySerializer.toBinaryRow(maxKey).copy(),
-                    stats,
-                    minSequenceNumber,
-                    maxSequenceNumber,
-                    level);
+            return stats;
         }
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index d8f8211..1bfdf1b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -313,10 +313,13 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             newMetas.addAll(
                     ManifestFileMeta.merge(
                             oldMetas,
-                            changes,
                             manifestFile,
                             manifestTargetSize.getBytes(),
                             manifestMergeMinCount));
+            // write new changes into manifest files
+            if (!changes.isEmpty()) {
+                newMetas.addAll(manifestFile.write(changes));
+            }
             // prepare snapshot file
             manifestListName = manifestList.write(newMetas);
             newSnapshot =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
new file mode 100644
index 0000000..aeb0d90
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A utility class to write a list of objects into several files, each with a size limit.
+ *
+ * @param <R> record type
+ * @param <F> file meta type
+ */
+public abstract class RollingFile<R, F> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RollingFile.class);
+
+    private final long suggestedFileSize;
+
+    public RollingFile(long suggestedFileSize) {
+        this.suggestedFileSize = suggestedFileSize;
+    }
+
+    /** Create the path for a new file. */
+    protected abstract Path newPath();
+
+    /** Create a new object writer. Called per file. */
+    protected abstract BulkWriter<RowData> newWriter(FSDataOutputStream out) throws IOException;
+
+    /**
+     * Called before writing a record into file. Per-record calculation can be performed here.
+     *
+     * @param record record to write
+     * @return serialized record
+     */
+    protected abstract RowData toRowData(R record);
+
+    /** Called before closing the current file. Per-file calculation can be performed here. */
+    protected abstract F collectFile(Path path) throws IOException;
+
+    public void write(Iterator<R> iterator, List<F> result, List<Path> filesToCleanUp)
+            throws IOException {
+        Writer writer = null;
+        Path currentPath = null;
+
+        while (iterator.hasNext()) {
+            if (writer == null) {
+                // create new rolling file
+                currentPath = newPath();
+                filesToCleanUp.add(currentPath);
+                writer = new Writer(currentPath);
+            }
+
+            RowData serialized = toRowData(iterator.next());
+            writer.write(serialized);
+
+            if (writer.exceedsSuggestedFileSize()) {
+                // exceeds suggested size, close current file
+                writer.finish();
+                result.add(collectFile(currentPath));
+                writer = null;
+            }
+        }
+
+        // finish last file
+        if (writer != null) {
+            writer.finish();
+            result.add(collectFile(currentPath));
+        }
+    }
+
+    private class Writer {
+        private final FSDataOutputStream out;
+        private final BulkWriter<RowData> writer;
+
+        private Writer(Path path) throws IOException {
+            this.out = path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE);
+            this.writer = newWriter(out);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Create new rolling file " + path.toString());
+            }
+        }
+
+        private void write(RowData record) throws IOException {
+            writer.addElement(record);
+        }
+
+        private boolean exceedsSuggestedFileSize() throws IOException {
+            // NOTE: this method is inaccurate for formats buffering changes in memory
+            return out.getPos() >= suggestedFileSize;
+        }
+
+        private void finish() throws IOException {
+            writer.finish();
+            out.close();
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 8bf995f..9296a31 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -35,15 +35,18 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -70,30 +73,28 @@ public class ManifestFileMetaTest {
         manifestFile = createManifestFile(tempDir.toString());
     }
 
-    @Test
-    public void testMerge() {
+    @ParameterizedTest
+    @ValueSource(ints = {2, 3, 4})
+    public void testMerge(int numLastBits) {
         List<ManifestFileMeta> input = new ArrayList<>();
-        List<ManifestEntry> entries = new ArrayList<>();
         List<ManifestFileMeta> expected = new ArrayList<>();
-        createData(input, entries, expected);
+        createData(numLastBits, input, expected);
 
-        List<ManifestFileMeta> actual =
-                ManifestFileMeta.merge(input, entries, manifestFile, 500, 3);
+        List<ManifestFileMeta> actual = ManifestFileMeta.merge(input, manifestFile, 500, 3);
         assertThat(actual).hasSameSizeAs(expected);
 
-        // these three manifest files are merged from the input
+        // these two manifest files are merged from the input
         assertSameContent(expected.get(0), actual.get(0), manifestFile);
         assertSameContent(expected.get(1), actual.get(1), manifestFile);
-        assertSameContent(expected.get(4), actual.get(4), manifestFile);
 
-        // these four manifest files should be kept without modification
+        // these two manifest files should be kept without modification
         assertThat(actual.get(2)).isEqualTo(input.get(5));
         assertThat(actual.get(3)).isEqualTo(input.get(6));
-        assertThat(actual.get(5)).isEqualTo(input.get(10));
-        assertThat(actual.get(6)).isEqualTo(input.get(11));
 
-        // this manifest file should be created from entries
-        assertSameContent(expected.get(7), actual.get(7), manifestFile);
+        // check last bits
+        for (int i = 4; i < actual.size(); i++) {
+            assertSameContent(expected.get(i), actual.get(i), manifestFile);
+        }
     }
 
     private void assertSameContent(
@@ -112,14 +113,13 @@ public class ManifestFileMetaTest {
     public void testCleanUpForException() throws IOException {
         FailingAtomicRenameFileSystem.get().reset(1, 10);
         List<ManifestFileMeta> input = new ArrayList<>();
-        List<ManifestEntry> entries = new ArrayList<>();
-        createData(input, entries, null);
+        createData(ThreadLocalRandom.current().nextInt(5), input, null);
         ManifestFile failingManifestFile =
                 createManifestFile(
                         FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
 
         try {
-            ManifestFileMeta.merge(input, entries, failingManifestFile, 500, 30);
+            ManifestFileMeta.merge(input, failingManifestFile, 500, 3);
         } catch (Throwable e) {
             assertThat(e)
                     .hasRootCauseExactlyInstanceOf(
@@ -146,24 +146,20 @@ public class ManifestFileMetaTest {
                         KEY_TYPE,
                         ROW_TYPE,
                         avro,
-                        new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default"))
+                        new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default"),
+                        Long.MAX_VALUE)
                 .create();
     }
 
     private void createData(
-            List<ManifestFileMeta> input,
-            List<ManifestEntry> entries,
-            List<ManifestFileMeta> expected) {
+            int numLastBits, List<ManifestFileMeta> input, List<ManifestFileMeta> expected) {
         // suggested size 500 and suggested count 3
         // file sizes:
         // 200, 300, -- multiple files exactly the suggested size
         // 100, 200, 300, -- multiple files exceeding the suggested size
         // 500, -- single file exactly the suggested size
         // 600, -- single file exceeding the suggested size
-        // 100, 100, 100, -- multiple files exceeding the suggested count
-        // 100, -- the last bit, not enough size or count, won't merge
-        // 300, -- the last bit, not enough size or count, won't merge
-        // 200, -- file created from entries
+        // 100 * numLastBits -- the last bit
 
         input.add(makeManifest(makeEntry(true, "A"), makeEntry(true, "B")));
         input.add(makeManifest(makeEntry(true, "C"), makeEntry(false, "B"), makeEntry(true, "D")));
@@ -189,15 +185,9 @@ public class ManifestFileMetaTest {
                         makeEntry(false, "K"),
                         makeEntry(true, "L")));
 
-        input.add(makeManifest(makeEntry(true, "M")));
-        input.add(makeManifest(makeEntry(true, "N")));
-        input.add(makeManifest(makeEntry(true, "O")));
-
-        input.add(makeManifest(makeEntry(true, "P")));
-        input.add(makeManifest(makeEntry(false, "Q"), makeEntry(true, "R"), makeEntry(true, "S")));
-
-        entries.add(makeEntry(false, "S"));
-        entries.add(makeEntry(true, "T"));
+        for (int i = 0; i < numLastBits; i++) {
+            input.add(makeManifest(makeEntry(true, String.valueOf(i))));
+        }
 
         if (expected == null) {
             return;
@@ -208,15 +198,22 @@ public class ManifestFileMetaTest {
         expected.add(makeManifest(makeEntry(false, "A"), makeEntry(true, "F")));
         expected.add(input.get(5));
         expected.add(input.get(6));
-        expected.add(
-                makeManifest(makeEntry(true, "M"), makeEntry(true, "N"), makeEntry(true, "O")));
-        expected.add(input.get(10));
-        expected.add(input.get(11));
-        expected.add(makeManifest(makeEntry(false, "S"), makeEntry(true, "T")));
+
+        if (numLastBits < 3) {
+            for (int i = 0; i < numLastBits; i++) {
+                expected.add(input.get(7 + i));
+            }
+        } else {
+            expected.add(
+                    makeManifest(
+                            IntStream.range(0, numLastBits)
+                                    .mapToObj(i -> makeEntry(true, String.valueOf(i)))
+                                    .toArray(ManifestEntry[]::new)));
+        }
     }
 
     private ManifestFileMeta makeManifest(ManifestEntry... entries) {
-        ManifestFileMeta writtenMeta = manifestFile.write(Arrays.asList(entries));
+        ManifestFileMeta writtenMeta = manifestFile.write(Arrays.asList(entries)).get(0);
         return new ManifestFileMeta(
                 writtenMeta.fileName(),
                 entries.length * 100, // for testing purpose
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index aa65636..f1c8c78 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 
@@ -32,6 +34,8 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -51,10 +55,12 @@ public class ManifestFileTest {
         ManifestFileMeta meta = gen.createManifestFileMeta(entries);
         ManifestFile manifestFile = createManifestFile(tempDir.toString());
 
-        ManifestFileMeta actualMeta = manifestFile.write(entries);
-        // we do not check file name and size as we can't know in advance
-        checkMetaIgnoringFileNameAndSize(meta, actualMeta);
-        List<ManifestEntry> actualEntries = manifestFile.read(actualMeta.fileName());
+        List<ManifestFileMeta> actualMetas = manifestFile.write(entries);
+        checkRollingFiles(meta, actualMetas, manifestFile.suggestedFileSize());
+        List<ManifestEntry> actualEntries =
+                actualMetas.stream()
+                        .flatMap(m -> manifestFile.read(m.fileName()).stream())
+                        .collect(Collectors.toList());
         assertThat(actualEntries).isEqualTo(entries);
     }
 
@@ -90,19 +96,39 @@ public class ManifestFileTest {
         FileStorePathFactory pathFactory =
                 new FileStorePathFactory(
                         new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
+        int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
         return new ManifestFile.Factory(
                         TestKeyValueGenerator.PARTITION_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.ROW_TYPE,
                         avro,
-                        pathFactory)
+                        pathFactory,
+                        suggestedFileSize)
                 .create();
     }
 
-    private void checkMetaIgnoringFileNameAndSize(
-            ManifestFileMeta expected, ManifestFileMeta actual) {
-        assertThat(actual.numAddedFiles()).isEqualTo(expected.numAddedFiles());
-        assertThat(actual.numDeletedFiles()).isEqualTo(expected.numDeletedFiles());
-        assertThat(actual.partitionStats()).isEqualTo(expected.partitionStats());
+    private void checkRollingFiles(
+            ManifestFileMeta expected, List<ManifestFileMeta> actual, long suggestedFileSize) {
+        // all but last file should be no smaller than suggestedFileSize
+        for (int i = 0; i + 1 < actual.size(); i++) {
+            assertThat(actual.get(i).fileSize() >= suggestedFileSize).isTrue();
+        }
+
+        // expected.numAddedFiles == sum(numAddedFiles)
+        assertThat(actual.stream().mapToLong(ManifestFileMeta::numAddedFiles).sum())
+                .isEqualTo(expected.numAddedFiles());
+
+        // expected.numDeletedFiles == sum(numDeletedFiles)
+        assertThat(actual.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum())
+                .isEqualTo(expected.numDeletedFiles());
+
+        // check stats
+        for (int i = 0; i < expected.partitionStats().length; i++) {
+            List<FieldStats> actualStats = new ArrayList<>();
+            for (ManifestFileMeta meta : actual) {
+                actualStats.add(meta.partitionStats()[i]);
+            }
+            StatsTestUtils.checkRollingFileStats(expected.partitionStats()[i], actualStats);
+        }
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index e5bde46..00f44e7 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializerTest;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FlushingAvroFormat;
@@ -297,35 +296,4 @@ public class SstFileTest {
             assertThat(meta.level()).isEqualTo(expected.level());
         }
     }
-
-    @SuppressWarnings("unchecked")
-    private void checkRollingFileStats(FieldStats expected, List<FieldStats> actual) {
-        if (expected.minValue() instanceof Comparable) {
-            Object actualMin = null;
-            Object actualMax = null;
-            for (FieldStats stats : actual) {
-                if (stats.minValue() != null
-                        && (actualMin == null
-                                || ((Comparable<Object>) stats.minValue()).compareTo(actualMin)
-                                        < 0)) {
-                    actualMin = stats.minValue();
-                }
-                if (stats.maxValue() != null
-                        && (actualMax == null
-                                || ((Comparable<Object>) stats.maxValue()).compareTo(actualMax)
-                                        > 0)) {
-                    actualMax = stats.maxValue();
-                }
-            }
-            assertThat(actualMin).isEqualTo(expected.minValue());
-            assertThat(actualMax).isEqualTo(expected.maxValue());
-        } else {
-            for (FieldStats stats : actual) {
-                assertThat(stats.minValue()).isNull();
-                assertThat(stats.maxValue()).isNull();
-            }
-        }
-        assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum())
-                .isEqualTo(expected.nullCount());
-    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
new file mode 100644
index 0000000..379b600
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Utils for stats related tests. */
+public class StatsTestUtils {
+
+    @SuppressWarnings("unchecked")
+    public static void checkRollingFileStats(FieldStats expected, List<FieldStats> actual) {
+        if (expected.minValue() instanceof Comparable) {
+            Object actualMin = null;
+            Object actualMax = null;
+            for (FieldStats stats : actual) {
+                if (stats.minValue() != null
+                        && (actualMin == null
+                                || ((Comparable<Object>) stats.minValue()).compareTo(actualMin)
+                                        < 0)) {
+                    actualMin = stats.minValue();
+                }
+                if (stats.maxValue() != null
+                        && (actualMax == null
+                                || ((Comparable<Object>) stats.maxValue()).compareTo(actualMax)
+                                        > 0)) {
+                    actualMax = stats.maxValue();
+                }
+            }
+            assertThat(actualMin).isEqualTo(expected.minValue());
+            assertThat(actualMax).isEqualTo(expected.maxValue());
+        } else {
+            for (FieldStats stats : actual) {
+                assertThat(stats.minValue()).isNull();
+                assertThat(stats.maxValue()).isNull();
+            }
+        }
+        assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum())
+                .isEqualTo(expected.nullCount());
+    }
+}