You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2023/01/17 09:56:09 UTC

[flink] branch master updated: [FLINK-29879][filesystem] Introduce operators for merging files in batch mode (#21257)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b5f465036d2 [FLINK-29879][filesystem] Introduce operators for merging files in batch mode (#21257)
b5f465036d2 is described below

commit b5f465036d28ed422a1218bfb8549cbf3e6ecd2e
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Jan 17 17:55:57 2023 +0800

    [FLINK-29879][filesystem] Introduce operators for merging files in batch mode (#21257)
---
 .../file/table/DynamicPartitionWriter.java         |  15 +-
 .../connector/file/table/FileSystemCommitter.java  |  41 ++++-
 .../file/table/FileSystemOutputFormat.java         |   6 +-
 .../file/table/GroupedPartitionWriter.java         |  15 +-
 .../connector/file/table/PartitionLoader.java      |  73 ++++++---
 .../file/table/PartitionTempFileManager.java       |  23 ++-
 .../connector/file/table/PartitionWriter.java      |  24 +++
 .../file/table/PartitionWriterFactory.java         |   9 +-
 .../file/table/SingleDirectoryWriter.java          |  30 ++--
 .../batch/compact/BatchCompactCoordinator.java     | 153 ++++++++++++++++++
 .../table/batch/compact/BatchCompactOperator.java  | 150 +++++++++++++++++
 .../file/table/batch/compact/BatchFileWriter.java  | 134 +++++++++++++++
 .../batch/compact/BatchPartitionCommitterSink.java | 145 +++++++++++++++++
 .../file/table/batch/compact/CompactFileUtils.java | 121 ++++++++++++++
 .../file/table/stream/compact/CompactMessages.java |  20 +++
 .../file/table/FileSystemCommitterTest.java        |   7 +-
 .../batch/compact/BatchCompactCoordinatorTest.java | 165 +++++++++++++++++++
 .../batch/compact/BatchCompactOperatorTest.java    | 133 +++++++++++++++
 .../table/batch/compact/BatchFileWriterTest.java   | 179 +++++++++++++++++++++
 .../compact/BatchPartitionCommitterSinkTest.java   | 137 ++++++++++++++++
 .../stream/compact/AbstractCompactTestBase.java    |   4 +-
 21 files changed, 1535 insertions(+), 49 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/DynamicPartitionWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/DynamicPartitionWriter.java
index c33db2ffdd3..20432a54156 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/DynamicPartitionWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/DynamicPartitionWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.file.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.core.fs.Path;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -37,13 +38,23 @@ public class DynamicPartitionWriter<T> implements PartitionWriter<T> {
     private final PartitionTempFileManager manager;
     private final PartitionComputer<T> computer;
     private final Map<String, OutputFormat<T>> formats;
+    private final PartitionWriterListener writerListener;
 
     public DynamicPartitionWriter(
             Context<T> context, PartitionTempFileManager manager, PartitionComputer<T> computer) {
+        this(context, manager, computer, new DefaultPartitionWriterListener());
+    }
+
+    public DynamicPartitionWriter(
+            Context<T> context,
+            PartitionTempFileManager manager,
+            PartitionComputer<T> computer,
+            PartitionWriterListener writerListener) {
         this.context = context;
         this.manager = manager;
         this.computer = computer;
         this.formats = new HashMap<>();
+        this.writerListener = writerListener;
     }
 
     @Override
@@ -53,8 +64,10 @@ public class DynamicPartitionWriter<T> implements PartitionWriter<T> {
 
         if (format == null) {
             // create a new format to write new partition.
-            format = context.createNewOutputFormat(manager.createPartitionDir(partition));
+            Path path = manager.createPartitionDir(partition);
+            format = context.createNewOutputFormat(path);
             formats.put(partition, format);
+            writerListener.onFileOpened(partition, path);
         }
         format.writeRecord(computer.projectColumnsToWrite(in));
     }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
index c63bea9e3fd..2e63117fcd5 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
@@ -23,12 +23,14 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.connector.file.table.PartitionTempFileManager.collectPartSpecToPaths;
 import static org.apache.flink.connector.file.table.PartitionTempFileManager.listTaskTemporaryPaths;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
 
 /**
  * File system file committer implementation. It moves all files to output path from temporary path.
@@ -48,7 +50,7 @@ import static org.apache.flink.connector.file.table.PartitionTempFileManager.lis
  * <p>See: {@link PartitionTempFileManager}. {@link PartitionLoader}.
  */
 @Internal
-class FileSystemCommitter {
+public class FileSystemCommitter {
 
     private final FileSystemFactory factory;
     private final TableMetaStoreFactory metaStoreFactory;
@@ -60,7 +62,7 @@ class FileSystemCommitter {
     private final LinkedHashMap<String, String> staticPartitions;
     private final List<PartitionCommitPolicy> policies;
 
-    FileSystemCommitter(
+    public FileSystemCommitter(
             FileSystemFactory factory,
             TableMetaStoreFactory metaStoreFactory,
             boolean overwrite,
@@ -97,11 +99,11 @@ class FileSystemCommitter {
                 } else {
                     for (Map.Entry<LinkedHashMap<String, String>, List<Path>> entry :
                             collectPartSpecToPaths(fs, taskPaths, partitionColumnSize).entrySet()) {
-                        loader.loadPartition(entry.getKey(), entry.getValue());
+                        loader.loadPartition(entry.getKey(), entry.getValue(), true);
                     }
                 }
             } else {
-                loader.loadNonPartition(taskPaths);
+                loader.loadNonPartition(taskPaths, true);
             }
         } finally {
             for (Path taskPath : taskPaths) {
@@ -109,4 +111,35 @@ class FileSystemCommitter {
             }
         }
     }
+
+    /**
+     * For committing job's output after successful batch job completion, it will commit with the
+     * given partitions and corresponding files written which means it'll move the temporary files
+     * to partition's location.
+     */
+    public void commitPartitionsWithFiles(Map<String, List<Path>> partitionsFiles)
+            throws Exception {
+        FileSystem fs = factory.create(tmpPath.toUri());
+        try (PartitionLoader loader =
+                new PartitionLoader(
+                        overwrite, fs, metaStoreFactory, isToLocal, identifier, policies)) {
+            if (partitionColumnSize > 0) {
+                if (partitionsFiles.isEmpty() && !staticPartitions.isEmpty()) {
+                    if (partitionColumnSize == staticPartitions.size()) {
+                        loader.loadEmptyPartition(this.staticPartitions);
+                    }
+                } else {
+                    for (Map.Entry<String, List<Path>> partitionFile : partitionsFiles.entrySet()) {
+                        LinkedHashMap<String, String> partSpec =
+                                extractPartitionSpecFromPath(new Path(partitionFile.getKey()));
+                        loader.loadPartition(partSpec, partitionFile.getValue(), false);
+                    }
+                }
+            } else {
+                List<Path> files = new ArrayList<>();
+                partitionsFiles.values().forEach(files::addAll);
+                loader.loadNonPartition(files, false);
+            }
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index 23768755ec5..22aac83ddb1 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -148,7 +148,11 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
                                     partitionColumns.length - staticPartitions.size() > 0,
                                     dynamicGrouped,
                                     staticPartitions)
-                            .create(context, fileManager, computer);
+                            .create(
+                                    context,
+                                    fileManager,
+                                    computer,
+                                    new PartitionWriter.DefaultPartitionWriterListener());
         } catch (Exception e) {
             throw new TableException("Exception in open", e);
         }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/GroupedPartitionWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/GroupedPartitionWriter.java
index 2ab92c92fee..7aadfc7495e 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/GroupedPartitionWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/GroupedPartitionWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.file.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.core.fs.Path;
 
 import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
 
@@ -35,15 +36,25 @@ public class GroupedPartitionWriter<T> implements PartitionWriter<T> {
     private final Context<T> context;
     private final PartitionTempFileManager manager;
     private final PartitionComputer<T> computer;
+    private final PartitionWriterListener writerListener;
 
     private OutputFormat<T> currentFormat;
     private String currentPartition;
 
     public GroupedPartitionWriter(
             Context<T> context, PartitionTempFileManager manager, PartitionComputer<T> computer) {
+        this(context, manager, computer, new DefaultPartitionWriterListener());
+    }
+
+    public GroupedPartitionWriter(
+            Context<T> context,
+            PartitionTempFileManager manager,
+            PartitionComputer<T> computer,
+            PartitionWriterListener writerListener) {
         this.context = context;
         this.manager = manager;
         this.computer = computer;
+        this.writerListener = writerListener;
     }
 
     @Override
@@ -54,7 +65,9 @@ public class GroupedPartitionWriter<T> implements PartitionWriter<T> {
                 currentFormat.close();
             }
 
-            currentFormat = context.createNewOutputFormat(manager.createPartitionDir(partition));
+            Path path = manager.createPartitionDir(partition);
+            currentFormat = context.createNewOutputFormat(path);
+            writerListener.onFileOpened(partition, path);
             currentPartition = partition;
         }
         currentFormat.writeRecord(computer.projectColumnsToWrite(in));
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java
index 4ff1a9c1f4d..74f730ae7ac 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java
@@ -75,8 +75,18 @@ public class PartitionLoader implements Closeable {
         this.policies = policies;
     }
 
-    /** Load a single partition. */
-    public void loadPartition(LinkedHashMap<String, String> partSpec, List<Path> srcDirs)
+    /**
+     * Load a single partition.
+     *
+     * @param partSpec the specification for the single partition
+     * @param srcPaths the paths for the files used to load to the single partition
+     * @param srcPathIsDir whether the every path in {@param srcPaths} is directory or not. If true,
+     *     it will load the files under the directory of the every path. If false, every path in
+     *     {@param srcPaths} is considered as single file, and it will load the single file for
+     *     every path.
+     */
+    public void loadPartition(
+            LinkedHashMap<String, String> partSpec, List<Path> srcPaths, boolean srcPathIsDir)
             throws Exception {
         Optional<Path> pathFromMeta = metaStore.getPartition(partSpec);
         Path path =
@@ -86,14 +96,22 @@ public class PartitionLoader implements Closeable {
                                         metaStore.getLocationPath(),
                                         generatePartitionPath(partSpec)));
 
-        overwriteAndMoveFiles(srcDirs, path);
+        overwriteAndMoveFiles(srcPaths, path, srcPathIsDir);
         commitPartition(partSpec, path);
     }
 
-    /** Load a non-partition files to output path. */
-    public void loadNonPartition(List<Path> srcDirs) throws Exception {
+    /**
+     * Load a non-partition files to output path.
+     *
+     * @param srcPaths the paths for the files used to load to the single partition
+     * @param srcPathIsDir whether the every path in {@param srcPaths} is directory or not. If true,
+     *     it will load the files under the directory of the every path. If false, every path in
+     *     {@param srcPaths} is considered as single file, and it will load the single file for
+     *     every path.
+     */
+    public void loadNonPartition(List<Path> srcPaths, boolean srcPathIsDir) throws Exception {
         Path tableLocation = metaStore.getLocationPath();
-        overwriteAndMoveFiles(srcDirs, tableLocation);
+        overwriteAndMoveFiles(srcPaths, tableLocation, srcPathIsDir);
         commitPartition(new LinkedHashMap<>(), tableLocation);
         metaStore.finishWritingTable(tableLocation);
     }
@@ -125,12 +143,13 @@ public class PartitionLoader implements Closeable {
         commitPartition(partSpec, path);
     }
 
-    private void overwriteAndMoveFiles(List<Path> srcDirs, Path destDir) throws Exception {
+    private void overwriteAndMoveFiles(List<Path> srcPaths, Path destDir, boolean srcPathIsDir)
+            throws Exception {
         FileSystem destFileSystem = destDir.getFileSystem();
         boolean dirSuccessExist = destFileSystem.exists(destDir) || destFileSystem.mkdirs(destDir);
         Preconditions.checkState(dirSuccessExist, "Failed to create dest path " + destDir);
         overwrite(destDir);
-        moveFiles(srcDirs, destDir);
+        moveFiles(srcPaths, destDir, srcPathIsDir);
     }
 
     private void overwrite(Path destDir) throws Exception {
@@ -148,23 +167,35 @@ public class PartitionLoader implements Closeable {
     }
 
     /** Moves files from srcDir to destDir. */
-    private void moveFiles(List<Path> srcDirs, Path destDir) throws Exception {
-        for (Path srcDir : srcDirs) {
-            if (!srcDir.equals(destDir)) {
-                FileStatus[] srcFiles = listStatusWithoutHidden(fs, srcDir);
-                if (srcFiles != null) {
-                    for (FileStatus srcFile : srcFiles) {
-                        Path srcPath = srcFile.getPath();
-                        Path destPath = new Path(destDir, srcPath.getName());
-                        // if it's not to move to local file system, just rename it
-                        if (!isToLocal) {
-                            fs.rename(srcPath, destPath);
-                        } else {
-                            FileUtils.copy(srcPath, destPath, true);
+    private void moveFiles(List<Path> srcPaths, Path destDir, boolean srcPathIsDir)
+            throws Exception {
+        if (srcPathIsDir) {
+            // if the src path is still a directory, list the directory to get the files that needed
+            // to be moved.
+            for (Path srcDir : srcPaths) {
+                if (!srcDir.equals(destDir)) {
+                    FileStatus[] srcFiles = listStatusWithoutHidden(fs, srcDir);
+                    if (srcFiles != null) {
+                        for (FileStatus srcFile : srcFiles) {
+                            moveFile(srcFile.getPath(), destDir);
                         }
                     }
                 }
             }
+        } else {
+            for (Path srcPath : srcPaths) {
+                moveFile(srcPath, destDir);
+            }
+        }
+    }
+
+    private void moveFile(Path srcPath, Path destDir) throws Exception {
+        Path destPath = new Path(destDir, srcPath.getName());
+        // if it's not to move to local file system, just rename it
+        if (!isToLocal) {
+            fs.rename(srcPath, destPath);
+        } else {
+            FileUtils.copy(srcPath, destPath, true);
         }
     }
 
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
index 5672fd51914..14bf3a30c81 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
@@ -49,6 +49,7 @@ public class PartitionTempFileManager {
     private static final Logger LOG = LoggerFactory.getLogger(PartitionTempFileManager.class);
 
     private static final String TASK_DIR_PREFIX = "task-";
+    private static final String ATTEMPT_PREFIX = "attempt-";
 
     private final int taskNumber;
     private final Path taskTmpDir;
@@ -56,7 +57,7 @@ public class PartitionTempFileManager {
 
     private transient int nameCounter = 0;
 
-    PartitionTempFileManager(FileSystemFactory factory, Path tmpPath, int taskNumber)
+    public PartitionTempFileManager(FileSystemFactory factory, Path tmpPath, int taskNumber)
             throws IOException {
         this(factory, tmpPath, taskNumber, new OutputFileConfig("", ""));
     }
@@ -75,6 +76,24 @@ public class PartitionTempFileManager {
         factory.create(taskTmpDir.toUri()).delete(taskTmpDir, true);
     }
 
+    public PartitionTempFileManager(
+            FileSystemFactory factory,
+            Path tmpPath,
+            int taskNumber,
+            int attemptNumber,
+            OutputFileConfig outputFileConfig)
+            throws IOException {
+        this.taskNumber = taskNumber;
+        this.outputFileConfig = outputFileConfig;
+
+        // generate task temp dir with task and attempt number like "task-0-attempt-0"
+        String taskTmpDirName =
+                String.format(
+                        "%s%d-%s%d", TASK_DIR_PREFIX, taskNumber, ATTEMPT_PREFIX, attemptNumber);
+        this.taskTmpDir = new Path(tmpPath, taskTmpDirName);
+        factory.create(taskTmpDir.toUri()).delete(taskTmpDir, true);
+    }
+
     /** Generate a new partition directory with partitions. */
     public Path createPartitionDir(String... partitions) {
         Path parentPath = taskTmpDir;
@@ -120,7 +139,7 @@ public class PartitionTempFileManager {
     }
 
     /** Collect all partitioned paths, aggregate according to partition spec. */
-    static Map<LinkedHashMap<String, String>, List<Path>> collectPartSpecToPaths(
+    public static Map<LinkedHashMap<String, String>, List<Path>> collectPartSpecToPaths(
             FileSystem fs, List<Path> taskPaths, int partColSize) {
         Map<LinkedHashMap<String, String>, List<Path>> specToPaths = new HashMap<>();
         for (Path taskPath : taskPaths) {
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java
index bf057cf28d4..e1c9c63d0c1 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java
@@ -62,4 +62,28 @@ public interface PartitionWriter<T> {
             return format;
         }
     }
+
+    /** Listener for partition writer. */
+    interface PartitionWriterListener {
+
+        /**
+         * Notifies a new file has been opened.
+         *
+         * <p>Note that this does not mean that the file has been created in the file system. It is
+         * only created logically and the actual file will be generated after it is committed.
+         *
+         * @param partition The partition for the newly opened file.
+         * @param file The newly created file.
+         */
+        void onFileOpened(String partition, Path file);
+    }
+
+    /** Default implementation for PartitionWriterListener. */
+    class DefaultPartitionWriterListener implements PartitionWriterListener {
+
+        @Override
+        public void onFileOpened(String partition, Path file) {
+            // do nothing
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriterFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriterFactory.java
index 597595b3801..0eedec0b4f3 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriterFactory.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriterFactory.java
@@ -29,7 +29,10 @@ import java.util.LinkedHashMap;
 public interface PartitionWriterFactory<T> extends Serializable {
 
     PartitionWriter<T> create(
-            Context<T> context, PartitionTempFileManager manager, PartitionComputer<T> computer)
+            Context<T> context,
+            PartitionTempFileManager manager,
+            PartitionComputer<T> computer,
+            PartitionWriter.PartitionWriterListener writerListener)
             throws Exception;
 
     /** Util for get a {@link PartitionWriterFactory}. */
@@ -41,9 +44,9 @@ public interface PartitionWriterFactory<T> extends Serializable {
             return grouped ? GroupedPartitionWriter::new : DynamicPartitionWriter::new;
         } else {
             return (PartitionWriterFactory<T>)
-                    (context, manager, computer) ->
+                    (context, manager, computer, writerListener) ->
                             new SingleDirectoryWriter<>(
-                                    context, manager, computer, staticPartitions);
+                                    context, manager, computer, staticPartitions, writerListener);
         }
     }
 }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/SingleDirectoryWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/SingleDirectoryWriter.java
index 11b1a39e00b..260c293b766 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/SingleDirectoryWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/SingleDirectoryWriter.java
@@ -20,8 +20,8 @@ package org.apache.flink.connector.file.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.core.fs.Path;
 
-import java.io.IOException;
 import java.util.LinkedHashMap;
 
 import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
@@ -38,6 +38,7 @@ public class SingleDirectoryWriter<T> implements PartitionWriter<T> {
     private final PartitionTempFileManager manager;
     private final PartitionComputer<T> computer;
     private final LinkedHashMap<String, String> staticPartitions;
+    private final PartitionWriterListener writerListener;
 
     private OutputFormat<T> format;
 
@@ -46,25 +47,32 @@ public class SingleDirectoryWriter<T> implements PartitionWriter<T> {
             PartitionTempFileManager manager,
             PartitionComputer<T> computer,
             LinkedHashMap<String, String> staticPartitions) {
+        this(context, manager, computer, staticPartitions, new DefaultPartitionWriterListener());
+    }
+
+    public SingleDirectoryWriter(
+            Context<T> context,
+            PartitionTempFileManager manager,
+            PartitionComputer<T> computer,
+            LinkedHashMap<String, String> staticPartitions,
+            PartitionWriterListener writerListener) {
         this.context = context;
         this.manager = manager;
         this.computer = computer;
         this.staticPartitions = staticPartitions;
-    }
-
-    private void createFormat() throws IOException {
-        this.format =
-                context.createNewOutputFormat(
-                        staticPartitions.size() == 0
-                                ? manager.createPartitionDir()
-                                : manager.createPartitionDir(
-                                        generatePartitionPath(staticPartitions)));
+        this.writerListener = writerListener;
     }
 
     @Override
     public void write(T in) throws Exception {
         if (format == null) {
-            createFormat();
+            String partition = generatePartitionPath(staticPartitions);
+            Path path =
+                    staticPartitions.isEmpty()
+                            ? manager.createPartitionDir()
+                            : manager.createPartitionDir(partition);
+            format = context.createNewOutputFormat(path);
+            writerListener.onFileOpened(partition, path);
         }
         format.writeRecord(computer.projectColumnsToWrite(in));
     }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinator.java
new file mode 100644
index 00000000000..41010b469dd
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.connector.file.table.BinPacking;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Coordinator for compaction in batch mode. It will collect the written files in {@link
+ * BatchFileWriter} and determine whether to compact files or not as well as what files should be
+ * merged into a single file.
+ *
+ * <p>NOTE: The coordination is a stable algorithm, which can ensure different attempts will produce
+ * same outputs.
+ */
+public class BatchCompactCoordinator extends AbstractStreamOperator<CoordinatorOutput>
+        implements OneInputStreamOperator<CoordinatorInput, CoordinatorOutput>, BoundedOneInput {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SupplierWithException<FileSystem, IOException> fsFactory;
+    private final long compactAverageSize;
+    private final long compactTargetSize;
+    private final StreamRecord<CoordinatorOutput> element = new StreamRecord<>(null);
+
+    private transient FileSystem fs;
+    // the mapping from written partitions to the corresponding files.
+    private transient Map<String, List<Path>> inputFiles;
+
+    public BatchCompactCoordinator(
+            SupplierWithException<FileSystem, IOException> fsFactory,
+            long compactAverageSize,
+            long compactTargetSize) {
+        this.fsFactory = fsFactory;
+        this.compactAverageSize = compactAverageSize;
+        this.compactTargetSize = compactTargetSize;
+    }
+
+    @Override
+    public void open() throws Exception {
+        fs = fsFactory.get();
+        inputFiles = new HashMap<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<CoordinatorInput> element) throws Exception {
+        CoordinatorInput coordinatorInput = element.getValue();
+        if (coordinatorInput instanceof InputFile) {
+            InputFile file = (InputFile) coordinatorInput;
+            // collect the written files
+            inputFiles
+                    .computeIfAbsent(file.getPartition(), k -> new ArrayList<>())
+                    .add(file.getFile());
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported input message: " + coordinatorInput);
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        for (Map.Entry<String, List<Path>> partitionFiles : inputFiles.entrySet()) {
+            compactPartitionFiles(partitionFiles.getKey(), partitionFiles.getValue());
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        inputFiles.clear();
+    }
+
+    private void compactPartitionFiles(String partition, List<Path> paths) throws IOException {
+        if (paths.isEmpty()) {
+            return;
+        }
+        int unitId = 0;
+        final Map<Path, Long> filesSize = getFilesSize(fs, paths);
+        // calculate the average size of these files
+        if (getAverageSize(filesSize) < compactAverageSize) {
+            // we should compact
+            // get the written files corresponding to the partition
+            Function<Path, Long> sizeFunc = filesSize::get;
+            // determine what files should be merged to a file
+            List<List<Path>> compactUnits = BinPacking.pack(paths, sizeFunc, compactTargetSize);
+            for (List<Path> compactUnit : compactUnits) {
+                // emit the compact units containing the files path
+                output.collect(
+                        element.replace(new CompactionUnit(unitId++, partition, compactUnit)));
+            }
+        } else {
+            // no need to merge these files, emit each single file to downstream for committing
+            for (Path path : paths) {
+                output.collect(
+                        element.replace(
+                                new CompactionUnit(
+                                        unitId++, partition, Collections.singletonList(path))));
+            }
+        }
+    }
+
+    private Map<Path, Long> getFilesSize(FileSystem fs, List<Path> paths) throws IOException {
+        Map<Path, Long> filesSize = new HashMap<>();
+        for (Path path : paths) {
+            long len = fs.getFileStatus(path).getLen();
+            filesSize.put(path, len);
+        }
+        return filesSize;
+    }
+
+    private double getAverageSize(Map<Path, Long> filesSize) {
+        int numFiles = 0;
+        long totalSz = 0;
+        for (Map.Entry<Path, Long> fileSize : filesSize.entrySet()) {
+            numFiles += 1;
+            totalSz += fileSize.getValue();
+        }
+        return totalSz / (numFiles * 1.0);
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperator.java
new file mode 100644
index 00000000000..72f718c1316
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CompactOperator for compaction in batch mode. It will compact files to a target file and then
+ * emit the compacted file's path to downstream operator. The main logic is similar to {@link
+ * org.apache.flink.connector.file.table.stream.compact.CompactOperator} but skip some unnecessary
+ * operations in batch mode.
+ *
+ * <p>Note: if the size of the files to be compacted is 1, this operator won't do anything and just
+ * emit the file to downstream. Also, the name of the files to be compacted is not a hidden file,
+ * it's expected these files are in hidden or temporary directory. Please make sure it. This
+ * assumption can help skip rename hidden file.
+ */
+public class BatchCompactOperator<T> extends AbstractStreamOperator<CompactOutput>
+        implements OneInputStreamOperator<CoordinatorOutput, CompactOutput>, BoundedOneInput {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String UNCOMPACTED_PREFIX = "uncompacted-";
+    public static final String COMPACTED_PREFIX = "compacted-";
+    public static final String ATTEMPT_PREFIX = "attempt-";
+
+    private final SupplierWithException<FileSystem, IOException> fsFactory;
+    private final CompactReader.Factory<T> readerFactory;
+    private final CompactWriter.Factory<T> writerFactory;
+
+    private transient FileSystem fileSystem;
+    private transient Map<String, List<Path>> compactedFiles;
+
+    public BatchCompactOperator(
+            SupplierWithException<FileSystem, IOException> fsFactory,
+            CompactReader.Factory<T> readerFactory,
+            CompactWriter.Factory<T> writerFactory) {
+        this.fsFactory = fsFactory;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+    }
+
+    @Override
+    public void open() throws Exception {
+        fileSystem = fsFactory.get();
+        compactedFiles = new HashMap<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<CoordinatorOutput> element) throws Exception {
+        CoordinatorOutput value = element.getValue();
+        if (value instanceof CompactionUnit) {
+            CompactionUnit unit = (CompactionUnit) value;
+            String partition = unit.getPartition();
+            // these files should be merged to one file
+            List<Path> paths = unit.getPaths();
+            Configuration config =
+                    getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
+            Path path = null;
+            if (paths.size() == 1) {
+                // for single file, we only need to move it to corresponding partition instead of
+                // compacting. we make the downstream commit operator to do the moving
+                path = paths.get(0);
+            } else if (paths.size() > 1) {
+                Path targetPath =
+                        createCompactedFile(paths, getRuntimeContext().getAttemptNumber());
+                path =
+                        CompactFileUtils.doCompact(
+                                fileSystem,
+                                partition,
+                                paths,
+                                targetPath,
+                                config,
+                                readerFactory,
+                                writerFactory);
+            }
+            if (path != null) {
+                compactedFiles.computeIfAbsent(partition, k -> new ArrayList<>()).add(path);
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported input message: " + value);
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        // emit the compacted files to downstream
+        output.collect(new StreamRecord<>(new CompactOutput(compactedFiles)));
+    }
+
+    @Override
+    public void close() throws Exception {
+        compactedFiles.clear();
+    }
+
+    private static Path createCompactedFile(List<Path> uncompactedFiles, int attemptNumber) {
+        Path path = convertFromUncompacted(uncompactedFiles.get(0));
+        // different attempt will have different target paths to avoid different attempts will
+        // write same path
+        return new Path(
+                path.getParent(), convertToCompactWithAttempt(attemptNumber, path.getName()));
+    }
+
+    public static Path convertFromUncompacted(Path path) {
+        Preconditions.checkArgument(
+                path.getName().startsWith(UNCOMPACTED_PREFIX),
+                "This should be uncompacted file: " + path);
+        return new Path(path.getParent(), path.getName().substring(UNCOMPACTED_PREFIX.length()));
+    }
+
+    private static String convertToCompactWithAttempt(int attemptNumber, String fileName) {
+        return String.format(
+                "%s%s%d-%s", COMPACTED_PREFIX, ATTEMPT_PREFIX, attemptNumber, fileName);
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
new file mode 100644
index 00000000000..9650ce3b8bd
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.OutputFormatFactory;
+import org.apache.flink.connector.file.table.PartitionComputer;
+import org.apache.flink.connector.file.table.PartitionTempFileManager;
+import org.apache.flink.connector.file.table.PartitionWriter;
+import org.apache.flink.connector.file.table.PartitionWriterFactory;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableException;
+
+import java.util.LinkedHashMap;
+
+/**
+ * An operator for writing files in batch mode. Once creating a new file to write, the writing
+ * operator will emit the written file to downstream.
+ */
+public class BatchFileWriter<T> extends AbstractStreamOperator<CoordinatorInput>
+        implements OneInputStreamOperator<T, CoordinatorInput>, BoundedOneInput {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileSystemFactory fsFactory;
+    private final Path tmpPath;
+    private final String[] partitionColumns;
+    private final boolean dynamicGrouped;
+    private final LinkedHashMap<String, String> staticPartitions;
+    private final PartitionComputer<T> computer;
+    private final OutputFormatFactory<T> formatFactory;
+    private final OutputFileConfig outputFileConfig;
+
+    private transient PartitionWriter<T> writer;
+
+    public BatchFileWriter(
+            FileSystemFactory fsFactory,
+            Path tmpPath,
+            String[] partitionColumns,
+            boolean dynamicGrouped,
+            LinkedHashMap<String, String> staticPartitions,
+            OutputFormatFactory<T> formatFactory,
+            PartitionComputer<T> computer,
+            OutputFileConfig outputFileConfig) {
+        this.fsFactory = fsFactory;
+        this.tmpPath = tmpPath;
+        this.partitionColumns = partitionColumns;
+        this.dynamicGrouped = dynamicGrouped;
+        this.staticPartitions = staticPartitions;
+        this.formatFactory = formatFactory;
+        this.computer = computer;
+        this.outputFileConfig = outputFileConfig;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        try {
+            PartitionTempFileManager fileManager =
+                    new PartitionTempFileManager(
+                            fsFactory,
+                            tmpPath,
+                            getRuntimeContext().getIndexOfThisSubtask(),
+                            getRuntimeContext().getAttemptNumber(),
+                            outputFileConfig);
+            Configuration config =
+                    getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
+            PartitionWriter.Context<T> context =
+                    new PartitionWriter.Context<>(config, formatFactory);
+
+            // when write a file, the listener emit the written files and the partition the files
+            // belonged
+            PartitionWriter.PartitionWriterListener writerListener =
+                    (partition, file) ->
+                            output.collect(new StreamRecord<>(new InputFile(partition, file)));
+            writer =
+                    PartitionWriterFactory.<T>get(
+                                    partitionColumns.length - staticPartitions.size() > 0,
+                                    dynamicGrouped,
+                                    staticPartitions)
+                            .create(context, fileManager, computer, writerListener);
+
+        } catch (Exception e) {
+            throw new TableException("Exception in open", e);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<T> element) throws Exception {
+        try {
+            writer.write(element.getValue());
+        } catch (Exception e) {
+            throw new TableException("Exception in writeRecord", e);
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {}
+
+    @Override
+    public void close() throws Exception {
+        try {
+            staticPartitions.clear();
+            writer.close();
+        } catch (Exception e) {
+            throw new TableException("Exception in close", e);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSink.java
new file mode 100644
index 00000000000..ea1c6cb13bb
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSink.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemCommitter;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.PartitionCommitPolicy;
+import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
+import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactOutput;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Committer operator for partition in batch mode. This is the single (non-parallel) task. It
+ * collects all the partition information including partition and written files send from upstream.
+ */
+public class BatchPartitionCommitterSink extends RichSinkFunction<CompactOutput> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileSystemFactory fsFactory;
+    private final TableMetaStoreFactory msFactory;
+    private final PartitionCommitPolicyFactory partitionCommitPolicyFactory;
+    private final Path tmpPath;
+    private final boolean overwrite;
+    private final boolean isToLocal;
+    private final String[] partitionColumns;
+    private final LinkedHashMap<String, String> staticPartitions;
+    private final ObjectIdentifier identifier;
+
+    private transient Map<String, List<Path>> partitionsFiles;
+
+    public BatchPartitionCommitterSink(
+            FileSystemFactory fsFactory,
+            TableMetaStoreFactory msFactory,
+            boolean overwrite,
+            boolean isToLocal,
+            Path tmpPath,
+            String[] partitionColumns,
+            LinkedHashMap<String, String> staticPartitions,
+            ObjectIdentifier identifier,
+            PartitionCommitPolicyFactory partitionCommitPolicyFactory) {
+        this.fsFactory = fsFactory;
+        this.msFactory = msFactory;
+        this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
+        this.tmpPath = tmpPath;
+        this.identifier = identifier;
+        this.overwrite = overwrite;
+        this.isToLocal = isToLocal;
+        this.partitionColumns = partitionColumns;
+        this.staticPartitions = staticPartitions;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        partitionsFiles = new HashMap<>();
+    }
+
+    @Override
+    public void invoke(CompactMessages.CompactOutput compactOutput, Context context)
+            throws Exception {
+        for (Map.Entry<String, List<Path>> compactFiles :
+                compactOutput.getCompactedFiles().entrySet()) {
+            // collect the written partition and written files
+            partitionsFiles
+                    .computeIfAbsent(compactFiles.getKey(), k -> new ArrayList<>())
+                    .addAll(compactFiles.getValue());
+        }
+    }
+
+    @Override
+    public void finish() throws Exception {
+        try {
+            List<PartitionCommitPolicy> policies = Collections.emptyList();
+            if (partitionCommitPolicyFactory != null) {
+                policies =
+                        partitionCommitPolicyFactory.createPolicyChain(
+                                getRuntimeContext().getUserCodeClassLoader(),
+                                () -> {
+                                    try {
+                                        return fsFactory.create(tmpPath.toUri());
+                                    } catch (IOException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                });
+            }
+            // commit the partitions with the given files
+            // it will move the written temporary files to partition's location
+            FileSystemCommitter committer =
+                    new FileSystemCommitter(
+                            fsFactory,
+                            msFactory,
+                            overwrite,
+                            tmpPath,
+                            partitionColumns.length,
+                            isToLocal,
+                            identifier,
+                            staticPartitions,
+                            policies);
+            committer.commitPartitionsWithFiles(partitionsFiles);
+        } catch (Exception e) {
+            throw new TableException("Exception in finish", e);
+        } finally {
+            try {
+                fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
+            } catch (IOException ignore) {
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        staticPartitions.clear();
+        partitionsFiles.clear();
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/CompactFileUtils.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/CompactFileUtils.java
new file mode 100644
index 00000000000..eee69bcb12e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/CompactFileUtils.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.stream.compact.CompactContext;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for compacting files. */
+public class CompactFileUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CompactFileUtils.class);
+
+    /**
+     * Do Compaction: - Target file exists, do nothing. Otherwise, it'll read the input files and
+     * write the target file to achieve compaction purpose.
+     */
+    public static @Nullable <T> Path doCompact(
+            FileSystem fileSystem,
+            String partition,
+            List<Path> paths,
+            Path target,
+            Configuration config,
+            CompactReader.Factory<T> readerFactory,
+            CompactWriter.Factory<T> writerFactory)
+            throws IOException {
+        if (paths.size() == 0) {
+            return null;
+        }
+
+        if (fileSystem.exists(target)) {
+            return target;
+        }
+
+        checkExist(fileSystem, paths);
+
+        long startMillis = System.currentTimeMillis();
+
+        Map<Path, Long> inputMap = new HashMap<>();
+        for (Path path : paths) {
+            inputMap.put(path, fileSystem.getFileStatus(path).getLen());
+        }
+
+        doMultiFilesCompact(
+                partition, paths, target, config, fileSystem, readerFactory, writerFactory);
+        Map<Path, Long> targetMap = new HashMap<>();
+        targetMap.put(target, fileSystem.getFileStatus(target).getLen());
+        double costSeconds = ((double) (System.currentTimeMillis() - startMillis)) / 1000;
+        LOG.info(
+                "Compaction time cost is '{}S', output per file as following format: name=size(byte), target file is '{}', input files are '{}'",
+                costSeconds,
+                targetMap,
+                inputMap);
+        return target;
+    }
+
+    private static <T> void doMultiFilesCompact(
+            String partition,
+            List<Path> files,
+            Path dst,
+            Configuration config,
+            FileSystem fileSystem,
+            CompactReader.Factory<T> readerFactory,
+            CompactWriter.Factory<T> writerFactory)
+            throws IOException {
+        CompactWriter<T> writer =
+                writerFactory.create(CompactContext.create(config, fileSystem, partition, dst));
+
+        for (Path path : files) {
+            try (CompactReader<T> reader =
+                    readerFactory.create(
+                            CompactContext.create(config, fileSystem, partition, path))) {
+                T record;
+                while ((record = reader.read()) != null) {
+                    writer.write(record);
+                }
+            }
+        }
+
+        // commit immediately
+        writer.commit();
+    }
+
+    private static void checkExist(FileSystem fileSystem, List<Path> candidates)
+            throws IOException {
+        for (Path path : candidates) {
+            if (!fileSystem.exists(path)) {
+                throw new IOException("Compaction file not exist: " + path);
+            }
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactMessages.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactMessages.java
index fc905cd34a9..a28b57eb03f 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactMessages.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactMessages.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -136,6 +137,25 @@ public class CompactMessages {
         }
     }
 
+    /**
+     * The output of {@link
+     * org.apache.flink.connector.file.table.batch.compact.BatchCompactOperator}.
+     */
+    public static class CompactOutput implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final Map<String, List<Path>> compactedFiles;
+
+        public CompactOutput(Map<String, List<Path>> compactedFiles) {
+            this.compactedFiles = compactedFiles;
+        }
+
+        public Map<String, List<Path>> getCompactedFiles() {
+            return compactedFiles;
+        }
+    }
+
     /** A flag to end compaction. */
     public static class EndCompaction implements CoordinatorOutput {
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
index 313f07f81f4..65e1c7d3147 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
@@ -38,7 +38,7 @@ import java.util.Optional;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link FileSystemCommitter}. */
-class FileSystemCommitterTest {
+public class FileSystemCommitterTest {
 
     private static final String SUCCESS_FILE_NAME = "_SUCCESS";
 
@@ -258,12 +258,13 @@ class FileSystemCommitterTest {
         assertThat(outputPath.toFile().list()).isEqualTo(new String[0]);
     }
 
-    static class TestMetaStoreFactory implements TableMetaStoreFactory {
+    /** A {@link TableMetaStoreFactory} for test purpose. */
+    public static class TestMetaStoreFactory implements TableMetaStoreFactory {
         private static final long serialVersionUID = 1L;
 
         private final Path outputPath;
 
-        TestMetaStoreFactory(Path outputPath) {
+        public TestMetaStoreFactory(Path outputPath) {
             this.outputPath = outputPath;
         }
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinatorTest.java
new file mode 100644
index 00000000000..8aaff411f4c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinatorTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.connector.file.table.stream.compact.AbstractCompactTestBase;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for batch compact coordinator. */
+public class BatchCompactCoordinatorTest extends AbstractCompactTestBase {
+
+    @Test
+    public void testCompactIsNotNeeded() throws Exception {
+        long averageSize = 5;
+        long targetSize = 50;
+        BatchCompactCoordinator compactCoordinator =
+                new BatchCompactCoordinator(() -> folder.getFileSystem(), averageSize, targetSize);
+
+        try (OneInputStreamOperatorTestHarness<CoordinatorInput, CoordinatorOutput> testHarness =
+                new OneInputStreamOperatorTestHarness<>(compactCoordinator)) {
+            testHarness.setup();
+            testHarness.open();
+
+            Path f1 = newFile("f1", 10);
+            Path f2 = newFile("f2", 5);
+            Path f3 = newFile("f3", 16);
+
+            testHarness.processElement(new StreamRecord<>(new InputFile("", f1)));
+            testHarness.processElement(new StreamRecord<>(new InputFile("", f2)));
+            testHarness.processElement(new StreamRecord<>(new InputFile("", f3)));
+            testHarness.endInput();
+
+            // the file average size is not less than 5, so it's no need to compact, the output
+            // compact unit should only contain one file
+            assertCompactUnits(
+                    testHarness.extractOutputValues(),
+                    Arrays.asList(
+                            new CompactionUnit(0, "", Collections.singletonList(f1)),
+                            new CompactionUnit(1, "", Collections.singletonList(f2)),
+                            new CompactionUnit(2, "", Collections.singletonList(f3))));
+        }
+    }
+
+    @Test
+    public void testCompactNonPartitionedTable() throws Exception {
+        long averageSize = 14;
+        long targetSize = 16;
+        BatchCompactCoordinator compactCoordinator =
+                new BatchCompactCoordinator(() -> folder.getFileSystem(), averageSize, targetSize);
+        try (OneInputStreamOperatorTestHarness<CoordinatorInput, CoordinatorOutput> testHarness =
+                new OneInputStreamOperatorTestHarness<>(compactCoordinator)) {
+            testHarness.setup();
+            testHarness.open();
+
+            Path f1 = newFile("f1", 10);
+            Path f2 = newFile("f2", 5);
+            Path f3 = newFile("f3", 20);
+
+            testHarness.processElement(new StreamRecord<>(new InputFile("", f1)));
+            testHarness.processElement(new StreamRecord<>(new InputFile("", f2)));
+            testHarness.processElement(new StreamRecord<>(new InputFile("", f3)));
+            testHarness.endInput();
+
+            List<CoordinatorOutput> coordinatorOutputs = testHarness.extractOutputValues();
+            // f1 + f2 should be merged
+            assertCompactUnits(
+                    coordinatorOutputs,
+                    Arrays.asList(
+                            new CompactionUnit(0, "", Arrays.asList(f1, f2)),
+                            new CompactionUnit(1, "", Collections.singletonList(f3))));
+        }
+    }
+
+    @Test
+    public void testCompactPartitionedTable() throws Exception {
+        long averageSize = 10;
+        long targetSize = 16;
+        BatchCompactCoordinator compactCoordinator =
+                new BatchCompactCoordinator(() -> folder.getFileSystem(), averageSize, targetSize);
+
+        try (OneInputStreamOperatorTestHarness<CoordinatorInput, CoordinatorOutput> testHarness =
+                new OneInputStreamOperatorTestHarness<>(compactCoordinator)) {
+            testHarness.setup();
+            testHarness.open();
+
+            // the files for partition "p1=1/", average size is 7.5, should be merged
+            Path f1 = newFile("f1", 10);
+            Path f2 = newFile("f2", 5);
+
+            // the files for partition "p1=2/", average size is 14, shouldn't be merged
+            Path f3 = newFile("f3", 20);
+            Path f4 = newFile("f4", 8);
+
+            // partition "p1=1/" should be merged
+            testHarness.processElement(new StreamRecord<>(new InputFile("p1=1/", f1)));
+            testHarness.processElement(new StreamRecord<>(new InputFile("p1=1/", f2)));
+
+            // partition "p1=2/" should be merged
+            testHarness.processElement(new StreamRecord<>(new InputFile("p1=2/", f3)));
+            testHarness.processElement(new StreamRecord<>(new InputFile("p1=2/", f4)));
+
+            testHarness.endInput();
+
+            List<CoordinatorOutput> coordinatorOutputs = testHarness.extractOutputValues();
+            // f1, f2 should be packed to compact unit
+            // f3/f4 is a single compact unit
+            assertCompactUnits(
+                    coordinatorOutputs,
+                    Arrays.asList(
+                            new CompactionUnit(0, "p1=1/", Arrays.asList(f1, f2)),
+                            new CompactionUnit(1, "p1=2/", Collections.singletonList(f3)),
+                            new CompactionUnit(2, "p1=2/", Collections.singletonList(f4))));
+        }
+    }
+
+    private void assertCompactUnits(
+            List<CoordinatorOutput> coordinatorOutputs,
+            List<CompactionUnit> expectCompactionUnits) {
+        assertThat(coordinatorOutputs.size()).isEqualTo(expectCompactionUnits.size());
+        coordinatorOutputs.sort(Comparator.comparing(o -> ((CompactionUnit) o).getPartition()));
+        expectCompactionUnits.sort(Comparator.comparing(CompactionUnit::getPartition));
+        for (int i = 0; i < coordinatorOutputs.size(); i++) {
+            CoordinatorOutput coordinatorOutput = coordinatorOutputs.get(i);
+            assertThat(coordinatorOutput).isInstanceOf(CompactionUnit.class);
+
+            CompactionUnit compactionUnit = (CompactionUnit) coordinatorOutput;
+            CompactionUnit expectCompactionUnit = expectCompactionUnits.get(i);
+
+            // assert compact unit is equal
+            assertThat(compactionUnit.getPartition())
+                    .isEqualTo(expectCompactionUnit.getPartition());
+            assertThat(compactionUnit.getPaths()).isEqualTo(expectCompactionUnit.getPaths());
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperatorTest.java
new file mode 100644
index 00000000000..80170fae1ab
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperatorTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.connector.file.table.stream.compact.AbstractCompactTestBase;
+import org.apache.flink.connector.file.table.stream.compact.CompactBulkReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.connector.file.table.stream.compact.TestByteFormat;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BatchCompactOperator}. */
+public class BatchCompactOperatorTest extends AbstractCompactTestBase {
+
+    @Test
+    public void testCompact() throws Exception {
+        // test compact
+        BatchCompactOperator<Byte> compactOperator = createBatchCompactOperator();
+        try (OneInputStreamOperatorTestHarness<CoordinatorOutput, CompactOutput> testHarness =
+                new OneInputStreamOperatorTestHarness<>(compactOperator)) {
+            testHarness.setup();
+            testHarness.open();
+
+            Path f0 = newFile("uncompacted-f0", 3);
+            Path f1 = newFile("uncompacted-f1", 2);
+            Path f2 = newFile("uncompacted-f2", 2);
+
+            Path f3 = newFile("uncompacted-f3", 10);
+
+            testHarness.processElement(
+                    new StreamRecord<>(new CompactionUnit(1, "p=p1/", Arrays.asList(f0, f1, f2))));
+            testHarness.processElement(
+                    new StreamRecord<>(
+                            new CompactionUnit(2, "p=p2/", Collections.singletonList(f3))));
+
+            testHarness.endInput();
+            List<CompactOutput> compactOutputs = testHarness.extractOutputValues();
+            Map<String, List<Path>> expectCompactedFiles = new HashMap<>();
+
+            expectCompactedFiles.put(
+                    "p=p1/",
+                    Collections.singletonList(new Path(folder + "/compacted-attempt-0-f0")));
+            // for single file, we won't compact, so the name won't change
+            expectCompactedFiles.put(
+                    "p=p2/", Collections.singletonList(new Path(folder + "/uncompacted-f3")));
+
+            // check compacted file
+            byte[] bytes =
+                    FileUtils.readAllBytes(
+                            new File(folder.getPath(), "compacted-attempt-0-f0").toPath());
+            Arrays.sort(bytes);
+            assertThat(bytes).isEqualTo(new byte[] {0, 0, 0, 1, 1, 1, 2});
+
+            bytes = FileUtils.readAllBytes(new File(folder.getPath(), "uncompacted-f3").toPath());
+            assertThat(bytes).isEqualTo(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+            assertCompactOutput(
+                    compactOutputs,
+                    Collections.singletonList(new CompactOutput(expectCompactedFiles)));
+        }
+    }
+
+    private void assertCompactOutput(
+            List<CompactOutput> actualCompactOutputs, List<CompactOutput> expectCompactOutputs) {
+        assertThat(actualCompactOutputs.size()).isEqualTo(expectCompactOutputs.size());
+        for (int i = 0; i < actualCompactOutputs.size(); i++) {
+            CompactOutput actualCompactOutput = actualCompactOutputs.get(i);
+            CompactOutput expectCompactOutput = expectCompactOutputs.get(i);
+            assertThat(actualCompactOutput.getCompactedFiles())
+                    .isEqualTo(expectCompactOutput.getCompactedFiles());
+        }
+    }
+
+    private BatchCompactOperator<Byte> createBatchCompactOperator() {
+        return new BatchCompactOperator<>(
+                () -> folder.getFileSystem(),
+                CompactBulkReader.factory(TestByteFormat.bulkFormat()),
+                context -> {
+                    Path path = context.getPath();
+                    Path tempPath = new Path(path.getParent(), "." + path.getName());
+                    FSDataOutputStream out =
+                            context.getFileSystem()
+                                    .create(tempPath, FileSystem.WriteMode.OVERWRITE);
+                    return new CompactWriter<Byte>() {
+                        @Override
+                        public void write(Byte record) throws IOException {
+                            out.write(record);
+                        }
+
+                        @Override
+                        public void commit() throws IOException {
+                            out.close();
+                            context.getFileSystem().rename(tempPath, path);
+                        }
+                    };
+                });
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriterTest.java
new file mode 100644
index 00000000000..cf1358c4a55
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriterTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.PartitionTempFileManager;
+import org.apache.flink.connector.file.table.RowPartitionComputer;
+import org.apache.flink.connector.file.table.stream.compact.AbstractCompactTestBase;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BatchFileWriter}. */
+public class BatchFileWriterTest extends AbstractCompactTestBase {
+    private final String[] columnNames = new String[] {"a", "b", "c"};
+    private FileSystemFactory fsFactory = FileSystem::get;
+
+    @Test
+    public void testWriteWithoutPartition() throws Exception {
+        String[] partitionColumns = new String[0];
+        BatchFileWriter<Row> fileWriter =
+                createBatchFileWriter(columnNames, partitionColumns, new LinkedHashMap<>(), false);
+        PartitionTempFileManager tempFileManager =
+                new PartitionTempFileManager(
+                        fsFactory, folder, 0, 0, OutputFileConfig.builder().build());
+
+        try (OneInputStreamOperatorTestHarness<Row, CoordinatorInput> testHarness =
+                new OneInputStreamOperatorTestHarness<>(fileWriter)) {
+            testHarness.setup();
+            testHarness.open();
+
+            // write data
+            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+            List<CoordinatorInput> coordinatorInputs = testHarness.extractOutputValues();
+
+            // should only write and emit one file
+            assertInputFile(
+                    coordinatorInputs,
+                    1,
+                    Collections.singletonList(""),
+                    Collections.singletonList(tempFileManager.createPartitionDir()));
+        }
+    }
+
+    @Test
+    public void testWriteWithStaticPartition() throws Exception {
+        String[] partitionColumns = new String[] {"b", "c"};
+        LinkedHashMap<String, String> staticParts = new LinkedHashMap<>();
+        staticParts.put("b", "p1");
+        staticParts.put("c", "p2");
+
+        BatchFileWriter<Row> fileWriter =
+                createBatchFileWriter(partitionColumns, partitionColumns, staticParts, false);
+        PartitionTempFileManager tempFileManager =
+                new PartitionTempFileManager(
+                        fsFactory, folder, 0, 0, OutputFileConfig.builder().build());
+
+        try (OneInputStreamOperatorTestHarness<Row, CoordinatorInput> testHarness =
+                new OneInputStreamOperatorTestHarness<>(fileWriter)) {
+            testHarness.setup();
+            testHarness.open();
+
+            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+            List<CoordinatorInput> coordinatorInputs = testHarness.extractOutputValues();
+
+            // should only write and emit one file
+            assertInputFile(
+                    coordinatorInputs,
+                    1,
+                    Collections.singletonList("b=p1/c=p2/"),
+                    Collections.singletonList(tempFileManager.createPartitionDir("b=p1/c=p2/")));
+        }
+    }
+
+    @Test
+    public void testWriteWithoutDynamicPartitionGrouped() throws Exception {
+        // test without dynamicGrouped
+        testWriteWithDynamicPartition(false);
+    }
+
+    @Test
+    public void testWriteWithDynamicPartitionGrouped() throws Exception {
+        // test with dynamicGrouped
+        testWriteWithDynamicPartition(true);
+    }
+
+    private void testWriteWithDynamicPartition(boolean dynamicGrouped) throws Exception {
+        String[] partitionColumns = new String[] {"b", "c"};
+
+        BatchFileWriter<Row> fileWriter =
+                createBatchFileWriter(
+                        columnNames, partitionColumns, new LinkedHashMap<>(), dynamicGrouped);
+        PartitionTempFileManager tempFileManager =
+                new PartitionTempFileManager(
+                        fsFactory, folder, 0, 0, OutputFileConfig.builder().build());
+
+        try (OneInputStreamOperatorTestHarness<Row, CoordinatorInput> testHarness =
+                new OneInputStreamOperatorTestHarness<>(fileWriter)) {
+            testHarness.setup();
+            testHarness.open();
+
+            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+            testHarness.processElement(new StreamRecord<>(Row.of("a1", 2, 1)));
+            List<CoordinatorInput> coordinatorInputs = testHarness.extractOutputValues();
+            // should write two files, one if for partition
+            assertInputFile(
+                    coordinatorInputs,
+                    2,
+                    Arrays.asList("b=1/c=2/", "b=2/c=1/"),
+                    Arrays.asList(
+                            tempFileManager.createPartitionDir("b=1/c=2/"),
+                            tempFileManager.createPartitionDir("b=2/c=1/")));
+        }
+    }
+
+    private void assertInputFile(
+            List<CoordinatorInput> coordinatorInputs,
+            int expectSize,
+            List<String> expectedPartitions,
+            List<Path> expectedFilePaths) {
+        assertThat(coordinatorInputs).hasSize(expectSize);
+        for (int i = 0; i < expectSize; i++) {
+            CoordinatorInput input = coordinatorInputs.get(i);
+            assertThat(input).isInstanceOf(InputFile.class);
+            InputFile inputFile = (InputFile) input;
+            assertThat(inputFile.getPartition()).isEqualTo(expectedPartitions.get(i));
+            assertThat(inputFile.getFile()).isEqualTo(expectedFilePaths.get(i));
+        }
+    }
+
+    private BatchFileWriter<Row> createBatchFileWriter(
+            String[] columnNames,
+            String[] partitionColumns,
+            LinkedHashMap<String, String> staticPartitions,
+            boolean dynamicGrouped) {
+        return new BatchFileWriter<>(
+                fsFactory,
+                folder,
+                partitionColumns,
+                dynamicGrouped,
+                staticPartitions,
+                TextOutputFormat::new,
+                new RowPartitionComputer("default", columnNames, partitionColumns),
+                OutputFileConfig.builder().build());
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java
new file mode 100644
index 00000000000..13f9b84198b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemCommitterTest;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
+import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+
+/** Test for {@link BatchPartitionCommitterSink}. */
+public class BatchPartitionCommitterSinkTest {
+    private final FileSystemFactory fileSystemFactory = FileSystem::get;
+
+    private TableMetaStoreFactory metaStoreFactory;
+    private ObjectIdentifier identifier;
+
+    @TempDir private java.nio.file.Path path;
+    @TempDir private java.nio.file.Path outputPath;
+
+    @BeforeEach
+    public void before() {
+        metaStoreFactory =
+                new FileSystemCommitterTest.TestMetaStoreFactory(new Path(outputPath.toString()));
+        identifier = ObjectIdentifier.of("hiveCatalog", "default", "test");
+    }
+
+    @Test
+    public void testPartitionCommit() throws Exception {
+        BatchPartitionCommitterSink committerSink =
+                new BatchPartitionCommitterSink(
+                        fileSystemFactory,
+                        metaStoreFactory,
+                        false,
+                        false,
+                        new Path(path.toString()),
+                        new String[] {"p1", "p2"},
+                        new LinkedHashMap<>(),
+                        identifier,
+                        new PartitionCommitPolicyFactory(null, null, null));
+        committerSink.open(new Configuration());
+
+        List<Path> pathList1 = createFiles(path, "task-1/p1=0/p2=0/", "f1", "f2");
+        List<Path> pathList2 = createFiles(path, "task-2/p1=0/p2=0/", "f3");
+        List<Path> pathList3 = createFiles(path, "task-2/p1=0/p2=1/", "f4");
+        Map<String, List<Path>> compactedFiles = new HashMap<>();
+        pathList1.addAll(pathList2);
+        compactedFiles.put("p1=0/p2=0/", pathList1);
+        compactedFiles.put("p1=0/p2=1/", pathList3);
+
+        committerSink.invoke(new CompactMessages.CompactOutput(compactedFiles), TEST_SINK_CONTEXT);
+        committerSink.setRuntimeContext(TEST_RUNTIME_CONTEXT);
+        committerSink.finish();
+        committerSink.close();
+        assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f1")).exists();
+        assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f2")).exists();
+        assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f3")).exists();
+        assertThat(new File(outputPath.toFile(), "p1=0/p2=1/f4")).exists();
+    }
+
+    private List<Path> createFiles(java.nio.file.Path parent, String path, String... files)
+            throws IOException {
+        java.nio.file.Path dir = Files.createDirectories(Paths.get(parent.toString(), path));
+        List<Path> paths = new ArrayList<>();
+        for (String file : files) {
+            paths.add(new Path(Files.createFile(dir.resolve(file)).toFile().getPath()));
+        }
+        return paths;
+    }
+
+    private static final RuntimeContext TEST_RUNTIME_CONTEXT = getMockRuntimeContext();
+    private static final SinkFunction.Context TEST_SINK_CONTEXT =
+            new SinkFunction.Context() {
+                @Override
+                public long currentProcessingTime() {
+                    return 0;
+                }
+
+                @Override
+                public long currentWatermark() {
+                    return 0;
+                }
+
+                @Override
+                public Long timestamp() {
+                    return null;
+                }
+            };
+
+    private static RuntimeContext getMockRuntimeContext() {
+        RuntimeContext context = Mockito.mock(RuntimeContext.class);
+        doReturn(Thread.currentThread().getContextClassLoader())
+                .when(context)
+                .getUserCodeClassLoader();
+        return context;
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/AbstractCompactTestBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/AbstractCompactTestBase.java
index 8e18069444a..2ea12647ec6 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/AbstractCompactTestBase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/AbstractCompactTestBase.java
@@ -34,14 +34,14 @@ public abstract class AbstractCompactTestBase {
 
     @TempDir private java.nio.file.Path path;
 
-    Path folder;
+    protected Path folder;
 
     @BeforeEach
     void before() {
         folder = new Path(path.toString());
     }
 
-    Path newFile(String name, int len) throws IOException {
+    protected Path newFile(String name, int len) throws IOException {
         Path path = new Path(folder, name);
         File file = new File(path.getPath());
         file.delete();