You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/16 17:26:22 UTC

[flink] branch master updated (3cf0393 -> aec2d38)

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3cf0393  [FLINK-24385][table] Introduce TRY_CAST
     new 6a0d6fa  [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink.
     new 51adf0f  [FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter.
     new b2ea16c  [FLINK-25583][connectors/filesystem] Add the getPath and getSize methods in PendingFileRecoverable.
     new 824752c8 [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter.
     new 255d656  [FLINK-25583][connectors/filesystem] Add compaction support for FileSink.
     new aec2d38  [FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../7602816f-5c01-4b7a-9e3e-235dfedec245           |   1 -
 .../apache/flink/connector/file/sink/FileSink.java | 169 ++++++-
 .../connector/file/sink/FileSinkCommittable.java   |  74 ++-
 .../file/sink/FileSinkCommittableSerializer.java   |  46 +-
 .../file/sink/committer/FileCommitter.java         |  21 +
 .../file/sink/compactor/ConcatFileCompactor.java   |  72 +++
 .../file/sink/compactor/DecoderBasedReader.java    |  96 ++++
 .../file/sink/compactor/FileCompactStrategy.java   | 112 +++++
 .../file/sink/compactor/FileCompactor.java         |  48 ++
 .../sink/compactor/IdenticalFileCompactor.java     |  45 ++
 .../sink/compactor/InputFormatBasedReader.java     |  78 +++
 .../compactor/OutputStreamBasedFileCompactor.java  |  53 ++
 .../sink/compactor/RecordWiseFileCompactor.java    |  84 ++++
 .../file/sink/compactor/SimpleStringDecoder.java   |  55 +++
 .../compactor/operator/CompactCoordinator.java     | 265 ++++++++++
 .../operator/CompactCoordinatorFactory.java        |  77 +++
 .../operator/CompactCoordinatorStateHandler.java   |  89 ++++
 .../CompactCoordinatorStateHandlerFactory.java     |  87 ++++
 .../sink/compactor/operator/CompactService.java    | 147 ++++++
 .../sink/compactor/operator/CompactorOperator.java | 343 +++++++++++++
 .../operator/CompactorOperatorFactory.java         |  93 ++++
 .../operator/CompactorOperatorStateHandler.java    | 323 +++++++++++++
 .../CompactorOperatorStateHandlerFactory.java      |  84 ++++
 .../sink/compactor/operator/CompactorRequest.java  |  70 +++
 .../operator/CompactorRequestSerializer.java       |  98 ++++
 .../operator/CompactorRequestTypeInfo.java         | 120 +++++
 .../file/sink/writer/FileWriterBucket.java         |   5 +-
 .../file/sink/BatchCompactingFileSinkITCase.java   |  69 +++
 .../file/sink/FileCommittableSerializerTest.java   |  25 +-
 ...FileSinkCommittableSerializerMigrationTest.java |   4 +-
 .../file/sink/FileSinkCompactionSwitchITCase.java  | 391 +++++++++++++++
 .../flink/connector/file/sink/FileSinkITBase.java  |   4 +-
 .../sink/StreamingCompactingFileSinkITCase.java    |  69 +++
 .../file/sink/committer/FileCommitterTest.java     |  14 +-
 .../sink/compactor/AbstractCompactTestBase.java    |  58 +++
 .../sink/compactor/CompactCoordinatorTest.java     | 449 +++++++++++++++++
 .../file/sink/compactor/CompactorOperatorTest.java | 534 +++++++++++++++++++++
 .../operator/CompactorRequestTypeInfoTest.java     |  41 ++
 .../file/sink/utils/FileSinkTestUtils.java         |  91 +++-
 .../sink/utils/IntegerFileSinkTestDataUtils.java   |  26 +
 .../functions/sink/filesystem/BucketWriter.java    |  26 +
 .../sink/filesystem/BulkBucketWriter.java          |   5 +-
 .../functions/sink/filesystem/BulkPartWriter.java  |   5 +-
 ...ssFileWriter.java => CompactingFileWriter.java} |  47 +-
 .../sink/filesystem/InProgressFileWriter.java      |  23 +-
 ... => OutputStreamBasedCompactingFileWriter.java} |  35 +-
 .../OutputStreamBasedPartFileWriter.java           | 198 +++++++-
 ...er.java => RecordWiseCompactingFileWriter.java} |  33 +-
 .../sink/filesystem/RowWiseBucketWriter.java       |   5 +-
 .../sink/filesystem/RowWisePartWriter.java         |  12 +-
 .../apache/flink/util/CloseShieldOutputStream.java |  56 +++
 .../SerializableSupplierWithException.java         |  34 ++
 .../flink/formats/avro/AvroWriterFactory.java      |  37 +-
 .../hadoop/bulk/HadoopPathBasedPartFileWriter.java |  64 ++-
 .../translators/SinkTransformationTranslator.java  |  11 -
 flink-tests/pom.xml                                |   8 +
 56 files changed, 4952 insertions(+), 177 deletions(-)
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/DecoderBasedReader.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/InputFormatBasedReader.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/SimpleStringDecoder.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorFactory.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandlerFactory.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorFactory.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandlerFactory.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestSerializer.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestTypeInfo.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/AbstractCompactTestBase.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestTypeInfoTest.java
 copy flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/{InProgressFileWriter.java => CompactingFileWriter.java} (51%)
 copy flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/{RowWisePartWriter.java => OutputStreamBasedCompactingFileWriter.java} (50%)
 copy flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/{RowWisePartWriter.java => RecordWiseCompactingFileWriter.java} (50%)
 create mode 100644 flink-core/src/main/java/org/apache/flink/util/CloseShieldOutputStream.java
 create mode 100644 flink-core/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java

[flink] 05/06: [FLINK-25583][connectors/filesystem] Add compaction support for FileSink.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 255d65613ad348f66b1043211f51ce7a56dfeac6
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 16 18:34:42 2022 +0800

    [FLINK-25583][connectors/filesystem] Add compaction support for FileSink.
---
 .../apache/flink/connector/file/sink/FileSink.java | 169 ++++++-
 .../file/sink/compactor/ConcatFileCompactor.java   |  72 +++
 .../file/sink/compactor/DecoderBasedReader.java    |  96 ++++
 .../file/sink/compactor/FileCompactStrategy.java   | 112 +++++
 .../file/sink/compactor/FileCompactor.java         |  48 ++
 .../sink/compactor/IdenticalFileCompactor.java     |  45 ++
 .../sink/compactor/InputFormatBasedReader.java     |  78 +++
 .../compactor/OutputStreamBasedFileCompactor.java  |  53 ++
 .../sink/compactor/RecordWiseFileCompactor.java    |  84 ++++
 .../file/sink/compactor/SimpleStringDecoder.java   |  55 +++
 .../compactor/operator/CompactCoordinator.java     | 265 ++++++++++
 .../operator/CompactCoordinatorFactory.java        |  77 +++
 .../operator/CompactCoordinatorStateHandler.java   |  89 ++++
 .../CompactCoordinatorStateHandlerFactory.java     |  87 ++++
 .../sink/compactor/operator/CompactService.java    | 147 ++++++
 .../sink/compactor/operator/CompactorOperator.java | 343 +++++++++++++
 .../operator/CompactorOperatorFactory.java         |  93 ++++
 .../operator/CompactorOperatorStateHandler.java    | 323 +++++++++++++
 .../CompactorOperatorStateHandlerFactory.java      |  84 ++++
 .../sink/compactor/operator/CompactorRequest.java  |  70 +++
 .../operator/CompactorRequestSerializer.java       |  98 ++++
 .../operator/CompactorRequestTypeInfo.java         | 120 +++++
 ...FileSinkCommittableSerializerMigrationTest.java |   4 +-
 .../sink/compactor/AbstractCompactTestBase.java    |  58 +++
 .../sink/compactor/CompactCoordinatorTest.java     | 449 +++++++++++++++++
 .../file/sink/compactor/CompactorOperatorTest.java | 534 +++++++++++++++++++++
 .../operator/CompactorRequestTypeInfoTest.java     |  41 ++
 .../file/sink/utils/FileSinkTestUtils.java         |  80 ++-
 .../sink/utils/IntegerFileSinkTestDataUtils.java   |  26 +
 .../apache/flink/util/CloseShieldOutputStream.java |  56 +++
 .../SerializableSupplierWithException.java         |  34 ++
 .../flink/formats/avro/AvroWriterFactory.java      |  37 +-
 flink-tests/pom.xml                                |   8 +
 33 files changed, 3886 insertions(+), 49 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
index 566037f..e0f9da7 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
@@ -23,11 +23,21 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinatorFactory;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinatorStateHandlerFactory;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandlerFactory;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestTypeInfo;
 import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
 import org.apache.flink.connector.file.sink.writer.FileWriter;
 import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
@@ -36,6 +46,10 @@ import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializ
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
@@ -46,6 +60,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.io.IOException;
@@ -109,7 +124,8 @@ import static org.apache.flink.util.Preconditions.checkState;
 public class FileSink<IN>
         implements StatefulSink<IN, FileWriterBucketState>,
                 TwoPhaseCommittingSink<IN, FileSinkCommittable>,
-                WithCompatibleState {
+                WithCompatibleState,
+                WithPreCommitTopology<IN, FileSinkCommittable> {
 
     private final BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder;
 
@@ -177,6 +193,74 @@ public class FileSink<IN>
                 basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
     }
 
+    @Override
+    public DataStream<CommittableMessage<FileSinkCommittable>> addPreCommitTopology(
+            DataStream<CommittableMessage<FileSinkCommittable>> committableStream) {
+        FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy();
+        if (strategy == null) {
+            // not enabled, handlers will be added to process the remaining states of the compact
+            // coordinator and the compactor operators.
+            SingleOutputStreamOperator<
+                            Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>>
+                    coordinatorOp =
+                            committableStream
+                                    .forward()
+                                    .transform(
+                                            "CompactorCoordinator",
+                                            new EitherTypeInfo<>(
+                                                    committableStream.getType(),
+                                                    new CompactorRequestTypeInfo(
+                                                            bucketsBuilder
+                                                                    ::getCommittableSerializer)),
+                                            new CompactCoordinatorStateHandlerFactory(
+                                                    bucketsBuilder::getCommittableSerializer))
+                                    .setParallelism(committableStream.getParallelism())
+                                    .uid("FileSinkCompactorCoordinator");
+
+            return coordinatorOp
+                    .forward()
+                    .transform(
+                            "CompactorOperator",
+                            committableStream.getType(),
+                            new CompactorOperatorStateHandlerFactory(
+                                    bucketsBuilder::getCommittableSerializer,
+                                    bucketsBuilder::createBucketWriter))
+                    .setParallelism(committableStream.getParallelism())
+                    .uid("FileSinkCompactorOperator");
+        }
+
+        // explicitly rebalance here is required, or the partitioner will be forward, which is in
+        // fact the partitioner from the writers to the committers
+        SingleOutputStreamOperator<CompactorRequest> coordinatorOp =
+                committableStream
+                        .rebalance()
+                        .transform(
+                                "CompactorCoordinator",
+                                new CompactorRequestTypeInfo(
+                                        bucketsBuilder::getCommittableSerializer),
+                                new CompactCoordinatorFactory(
+                                        strategy, bucketsBuilder::getCommittableSerializer))
+                        .setParallelism(1)
+                        .uid("FileSinkCompactorCoordinator");
+
+        // parallelism of the compactors is not configurable at present, since it must be identical
+        // to that of the committers, or the committable summary and the committables may be
+        // distributed to different committers, which will cause a failure
+        TypeInformation<CommittableMessage<FileSinkCommittable>> committableType =
+                committableStream.getType();
+        return coordinatorOp
+                .transform(
+                        "CompactorOperator",
+                        committableType,
+                        new CompactorOperatorFactory(
+                                strategy,
+                                bucketsBuilder.getFileCompactor(),
+                                bucketsBuilder::getCommittableSerializer,
+                                bucketsBuilder::createBucketWriter))
+                .setParallelism(committableStream.getParallelism())
+                .uid("FileSinkCompactorOperator");
+    }
+
     /** The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}. */
     @Internal
     private abstract static class BucketsBuilder<IN, T extends BucketsBuilder<IN, T>>
@@ -204,6 +288,15 @@ public class FileSink<IN>
         @Internal
         abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer()
                 throws IOException;
+
+        @Internal
+        abstract FileCompactStrategy getCompactStrategy();
+
+        @Internal
+        abstract FileCompactor getFileCompactor();
+
+        @Internal
+        abstract BucketWriter<IN, String> createBucketWriter() throws IOException;
     }
 
     /** A builder for configuring the sink for row-wise encoding formats. */
@@ -226,6 +319,10 @@ public class FileSink<IN>
 
         private OutputFileConfig outputFileConfig;
 
+        private FileCompactStrategy compactStrategy;
+
+        private FileCompactor fileCompactor;
+
         protected RowFormatBuilder(
                 Path basePath, Encoder<IN> encoder, BucketAssigner<IN, String> bucketAssigner) {
             this(
@@ -275,6 +372,12 @@ public class FileSink<IN>
             return self();
         }
 
+        public T enableCompact(final FileCompactStrategy strategy, final FileCompactor compactor) {
+            this.compactStrategy = checkNotNull(strategy);
+            this.fileCompactor = checkNotNull(compactor);
+            return self();
+        }
+
         /** Creates the actual sink. */
         public FileSink<IN> build() {
             return new FileSink<>(this);
@@ -282,6 +385,19 @@ public class FileSink<IN>
 
         @Override
         FileWriter<IN> createWriter(InitContext context) throws IOException {
+            OutputFileConfig writerFileConfig;
+            if (compactStrategy == null) {
+                writerFileConfig = outputFileConfig;
+            } else {
+                // Compaction is enabled. We always commit before compacting, so the file written by
+                // writer should be hid.
+                writerFileConfig =
+                        OutputFileConfig.builder()
+                                .withPartPrefix("." + outputFileConfig.getPartPrefix())
+                                .withPartSuffix(outputFileConfig.getPartSuffix())
+                                .build();
+            }
+
             return new FileWriter<>(
                     basePath,
                     context.metricGroup(),
@@ -289,7 +405,7 @@ public class FileSink<IN>
                     bucketFactory,
                     createBucketWriter(),
                     rollingPolicy,
-                    outputFileConfig,
+                    writerFileConfig,
                     context.getProcessingTimeService(),
                     bucketCheckInterval);
         }
@@ -300,6 +416,16 @@ public class FileSink<IN>
         }
 
         @Override
+        FileCompactStrategy getCompactStrategy() {
+            return compactStrategy;
+        }
+
+        @Override
+        FileCompactor getFileCompactor() {
+            return fileCompactor;
+        }
+
+        @Override
         SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer()
                 throws IOException {
             BucketWriter<IN, String> bucketWriter = createBucketWriter();
@@ -319,7 +445,7 @@ public class FileSink<IN>
                     bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
         }
 
-        private BucketWriter<IN, String> createBucketWriter() throws IOException {
+        BucketWriter<IN, String> createBucketWriter() throws IOException {
             return new RowWiseBucketWriter<>(
                     FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder);
         }
@@ -357,6 +483,10 @@ public class FileSink<IN>
 
         private OutputFileConfig outputFileConfig;
 
+        private FileCompactStrategy compactStrategy;
+
+        private FileCompactor fileCompactor;
+
         protected BulkFormatBuilder(
                 Path basePath,
                 BulkWriter.Factory<IN> writerFactory,
@@ -424,6 +554,12 @@ public class FileSink<IN>
                     outputFileConfig);
         }
 
+        public T enableCompact(final FileCompactStrategy strategy, final FileCompactor compactor) {
+            this.compactStrategy = checkNotNull(strategy);
+            this.fileCompactor = checkNotNull(compactor);
+            return self();
+        }
+
         /** Creates the actual sink. */
         public FileSink<IN> build() {
             return new FileSink<>(this);
@@ -431,6 +567,19 @@ public class FileSink<IN>
 
         @Override
         FileWriter<IN> createWriter(InitContext context) throws IOException {
+            OutputFileConfig writerFileConfig;
+            if (compactStrategy == null) {
+                writerFileConfig = outputFileConfig;
+            } else {
+                // Compaction is enabled. We always commit before compacting, so the file written by
+                // writer should be hid.
+                writerFileConfig =
+                        OutputFileConfig.builder()
+                                .withPartPrefix("." + outputFileConfig.getPartPrefix())
+                                .withPartSuffix(outputFileConfig.getPartSuffix())
+                                .build();
+            }
+
             return new FileWriter<>(
                     basePath,
                     context.metricGroup(),
@@ -438,7 +587,7 @@ public class FileSink<IN>
                     bucketFactory,
                     createBucketWriter(),
                     rollingPolicy,
-                    outputFileConfig,
+                    writerFileConfig,
                     context.getProcessingTimeService(),
                     bucketCheckInterval);
         }
@@ -449,6 +598,16 @@ public class FileSink<IN>
         }
 
         @Override
+        FileCompactStrategy getCompactStrategy() {
+            return compactStrategy;
+        }
+
+        @Override
+        FileCompactor getFileCompactor() {
+            return fileCompactor;
+        }
+
+        @Override
         SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer()
                 throws IOException {
             BucketWriter<IN, String> bucketWriter = createBucketWriter();
@@ -468,7 +627,7 @@ public class FileSink<IN>
                     bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
         }
 
-        private BucketWriter<IN, String> createBucketWriter() throws IOException {
+        BucketWriter<IN, String> createBucketWriter() throws IOException {
             return new BulkBucketWriter<>(
                     FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory);
         }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.java
new file mode 100644
index 0000000..64fba99
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * A {@link OutputStreamBasedFileCompactor} implementation that simply concat the compacting files.
+ * The fileDelimiter will be added between neighbouring files if provided.
+ */
+@PublicEvolving
+public class ConcatFileCompactor extends OutputStreamBasedFileCompactor {
+
+    private static final int CHUNK_SIZE = 4 * 1024 * 1024;
+
+    private final byte[] fileDelimiter;
+
+    public ConcatFileCompactor() {
+        this(null);
+    }
+
+    public ConcatFileCompactor(@Nullable byte[] fileDelimiter) {
+        this.fileDelimiter = fileDelimiter;
+    }
+
+    @Override
+    protected void doCompact(List<Path> inputFiles, OutputStream outputStream) throws Exception {
+        FileSystem fs = inputFiles.get(0).getFileSystem();
+        for (Path input : inputFiles) {
+            try (FSDataInputStream inputStream = fs.open(input)) {
+                copy(inputStream, outputStream);
+            }
+            if (fileDelimiter != null) {
+                outputStream.write(fileDelimiter);
+            }
+        }
+    }
+
+    private void copy(InputStream in, OutputStream out) throws IOException {
+        byte[] buf = new byte[CHUNK_SIZE];
+        int length;
+        while ((length = in.read(buf)) > 0) {
+            out.write(buf, 0, length);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/DecoderBasedReader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/DecoderBasedReader.java
new file mode 100644
index 0000000..55f4942
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/DecoderBasedReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link RecordWiseFileCompactor.Reader} implementation that reads the file as an {@link
+ * FSDataInputStream} and decodes the record with the {@link Decoder}.
+ */
+@PublicEvolving
+public class DecoderBasedReader<T> implements RecordWiseFileCompactor.Reader<T> {
+    private final Decoder<T> decoder;
+
+    public DecoderBasedReader(Path path, Decoder<T> decoder) throws IOException {
+        this.decoder = checkNotNull(decoder);
+        InputStream input = path.getFileSystem().open(path);
+        this.decoder.open(input);
+    }
+
+    @Override
+    public T read() throws IOException {
+        return decoder.decodeNext();
+    }
+
+    @Override
+    public void close() throws Exception {
+        decoder.close();
+    }
+
+    /**
+     * A {@link Decoder} to decode the file content into the actual records.
+     *
+     * <p>A {@link Decoder} is generally the reverse of a {@link
+     * org.apache.flink.api.common.serialization.Encoder}.
+     *
+     * @param <T> Thy type of the records the reader is reading.
+     */
+    public interface Decoder<T> extends Serializable {
+
+        /** Prepares to start decoding the input stream. */
+        void open(InputStream input) throws IOException;
+
+        /**
+         * @return The next record that decoded from the opened input stream, or null if no more
+         *     available.
+         */
+        T decodeNext() throws IOException;
+
+        /** Closes the open resources. The decoder is responsible to close the input stream. */
+        void close() throws IOException;
+
+        /** Factory to create {@link Decoder}. */
+        interface Factory<T> extends Serializable {
+            Decoder<T> create();
+        }
+    }
+
+    /** Factory for {@link DecoderBasedReader}. */
+    public static class Factory<T> implements RecordWiseFileCompactor.Reader.Factory<T> {
+        private final Decoder.Factory<T> decoderFactory;
+
+        public Factory(Decoder.Factory<T> decoderFactory) {
+            this.decoderFactory = decoderFactory;
+        }
+
+        @Override
+        public DecoderBasedReader<T> createFor(Path path) throws IOException {
+            return new DecoderBasedReader<>(path, decoderFactory.create());
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.java
new file mode 100644
index 0000000..64c4145
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.sink.FileSink;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Strategy for compacting the files written in {@link FileSink} before committing. */
+@PublicEvolving
+public class FileCompactStrategy implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    // Compaction triggering strategies.
+    private final long sizeThreshold;
+    private final int numCheckpointsBeforeCompaction;
+
+    // Compaction executing strategies.
+    private final int numCompactThreads;
+
+    private FileCompactStrategy(
+            long sizeThreshold, int numCheckpointsBeforeCompaction, int numCompactThreads) {
+        this.sizeThreshold = sizeThreshold;
+        this.numCheckpointsBeforeCompaction = numCheckpointsBeforeCompaction;
+        this.numCompactThreads = numCompactThreads;
+    }
+
+    public long getSizeThreshold() {
+        return sizeThreshold;
+    }
+
+    public int getNumCheckpointsBeforeCompaction() {
+        return numCheckpointsBeforeCompaction;
+    }
+
+    public int getNumCompactThreads() {
+        return numCompactThreads;
+    }
+
+    /** Builder for {@link FileCompactStrategy}. */
+    public static class Builder {
+        private int numCheckpointsBeforeCompaction = -1;
+        private long sizeThreshold = -1;
+        private int numCompactThreads = -1;
+
+        public static FileCompactStrategy.Builder newBuilder() {
+            return new FileCompactStrategy.Builder();
+        }
+
+        /**
+         * Optional, compaction will be triggered when N checkpoints passed since the last
+         * triggering, -1 by default indicating no compaction on checkpoint.
+         */
+        public FileCompactStrategy.Builder enableCompactionOnCheckpoint(
+                int numCheckpointsBeforeCompaction) {
+            checkArgument(
+                    numCheckpointsBeforeCompaction > 0,
+                    "Number of checkpoints before compaction should be more than 0.");
+            this.numCheckpointsBeforeCompaction = numCheckpointsBeforeCompaction;
+            return this;
+        }
+
+        /**
+         * Optional, compaction will be triggered when the total size of compacting files reaches
+         * the threshold. -1 by default, indicating the size is unlimited.
+         */
+        public FileCompactStrategy.Builder setSizeThreshold(long sizeThreshold) {
+            this.sizeThreshold = sizeThreshold;
+            return this;
+        }
+
+        /** Optional, the count of compacting threads in a compactor operator, 1 by default. */
+        public FileCompactStrategy.Builder setNumCompactThreads(int numCompactThreads) {
+            checkArgument(numCompactThreads > 0, "Compact threads should be more than 0.");
+            this.numCompactThreads = numCompactThreads;
+            return this;
+        }
+
+        public FileCompactStrategy build() {
+            validate();
+            return new FileCompactStrategy(
+                    sizeThreshold, numCheckpointsBeforeCompaction, numCompactThreads);
+        }
+
+        private void validate() {
+            if (sizeThreshold < 0 && numCheckpointsBeforeCompaction <= 0) {
+                throw new IllegalArgumentException(
+                        "At least one trigger condition must be configured.");
+            }
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
new file mode 100644
index 0000000..b737fd3
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * The {@link FileCompactor} is responsible for compacting files into one file.
+ *
+ * <p>The {@link FileCompactor} should declare which type of {@link CompactingFileWriter} is
+ * required, and invoke the writer correspondingly.
+ */
+@PublicEvolving
+public interface FileCompactor extends Serializable {
+
+    /** @return the {@link CompactingFileWriter} type the compactor will use. */
+    CompactingFileWriter.Type getWriterType();
+
+    /**
+     * Compact the given files into one file.
+     *
+     * @param inputFiles the files to be compacted.
+     * @param writer the writer to write the compacted file.
+     * @throws Exception Thrown if an exception occurs during the compacting.
+     */
+    void compact(List<Path> inputFiles, CompactingFileWriter writer) throws Exception;
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
new file mode 100644
index 0000000..8e6e4a1
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A simple {@link OutputStreamBasedFileCompactor} implementation that directly copy the content of
+ * the only input file to the output.
+ */
+@Internal
+public class IdenticalFileCompactor extends ConcatFileCompactor {
+
+    public IdenticalFileCompactor() {
+        super();
+    }
+
+    @Override
+    public void compact(List<Path> inputFiles, CompactingFileWriter writer) throws Exception {
+        checkState(inputFiles.size() == 1, "IdenticalFileCompactor can only copy one input file");
+        super.compact(inputFiles, writer);
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/InputFormatBasedReader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/InputFormatBasedReader.java
new file mode 100644
index 0000000..215f6bd
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/InputFormatBasedReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.function.SerializableSupplierWithException;
+
+import java.io.IOException;
+
+/**
+ * A {@link RecordWiseFileCompactor.Reader} implementation that reads the file using the {@link
+ * FileInputFormat}.
+ */
+@PublicEvolving
+public class InputFormatBasedReader<T> implements RecordWiseFileCompactor.Reader<T> {
+    private final Path path;
+    private final FileInputFormat<T> inputFormat;
+
+    public InputFormatBasedReader(Path path, FileInputFormat<T> inputFormat) throws IOException {
+        this.path = path;
+        this.inputFormat = inputFormat;
+        open();
+    }
+
+    private void open() throws IOException {
+        long len = path.getFileSystem().getFileStatus(path).getLen();
+        inputFormat.open(new FileInputSplit(0, path, 0, len, null));
+    }
+
+    @Override
+    public T read() throws IOException {
+        if (inputFormat.reachedEnd()) {
+            return null;
+        }
+        return inputFormat.nextRecord(null);
+    }
+
+    @Override
+    public void close() throws IOException {
+        inputFormat.close();
+    }
+
+    /** Factory for {@link InputFormatBasedReader}. */
+    public static class Factory<T> implements RecordWiseFileCompactor.Reader.Factory<T> {
+        private final SerializableSupplierWithException<FileInputFormat<T>, IOException>
+                inputFormatFactory;
+
+        public Factory(
+                SerializableSupplierWithException<FileInputFormat<T>, IOException>
+                        inputFormatFactory) {
+            this.inputFormatFactory = inputFormatFactory;
+        }
+
+        @Override
+        public InputFormatBasedReader<T> createFor(Path path) throws IOException {
+            return new InputFormatBasedReader<>(path, inputFormatFactory.get());
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
new file mode 100644
index 0000000..2ac3805
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter;
+import org.apache.flink.util.CloseShieldOutputStream;
+
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * Base class for {@link FileCompactor} implementations that use the {@link
+ * OutputStreamBasedCompactingFileWriter}.
+ */
+@PublicEvolving
+public abstract class OutputStreamBasedFileCompactor implements FileCompactor {
+    @Override
+    public final CompactingFileWriter.Type getWriterType() {
+        return CompactingFileWriter.Type.OUTPUT_STREAM;
+    }
+
+    @Override
+    public void compact(List<Path> inputFiles, CompactingFileWriter writer) throws Exception {
+        // The outputStream returned by OutputStreamBasedCompactingFileWriter#asOutputStream should
+        // not be closed here.
+        CloseShieldOutputStream outputStream =
+                new CloseShieldOutputStream(
+                        ((OutputStreamBasedCompactingFileWriter) writer).asOutputStream());
+        doCompact(inputFiles, outputStream);
+    }
+
+    protected abstract void doCompact(List<Path> inputFiles, OutputStream outputStream)
+            throws Exception;
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
new file mode 100644
index 0000000..3244961
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RecordWiseCompactingFileWriter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A {@link FileCompactor} implementation that reads input files with a {@link Reader} and writes
+ * with the {@link RecordWiseCompactingFileWriter}.
+ */
+@PublicEvolving
+public class RecordWiseFileCompactor<IN> implements FileCompactor {
+    private final Reader.Factory<IN> readerFactory;
+
+    public RecordWiseFileCompactor(Reader.Factory<IN> readerFactory) {
+        this.readerFactory = readerFactory;
+    }
+
+    @Override
+    public final CompactingFileWriter.Type getWriterType() {
+        return CompactingFileWriter.Type.RECORD_WISE;
+    }
+
+    @Override
+    public void compact(List<Path> inputFiles, CompactingFileWriter writer) throws Exception {
+        RecordWiseCompactingFileWriter<IN> recordWriter =
+                (RecordWiseCompactingFileWriter<IN>) writer;
+        for (Path input : inputFiles) {
+            try (Reader<IN> reader = readerFactory.createFor(input)) {
+                IN elem;
+                while ((elem = reader.read()) != null) {
+                    recordWriter.write(elem);
+                }
+            }
+        }
+    }
+
+    /**
+     * The reader that reads record from the compacting files.
+     *
+     * @param <T> Thy type of the records that is read.
+     */
+    public interface Reader<T> extends AutoCloseable {
+
+        /** @return The next record, or null if no more available. */
+        T read() throws IOException;
+
+        /**
+         * Factory for {@link Reader}.
+         *
+         * @param <T> Thy type of the records that is read.
+         */
+        interface Factory<T> extends Serializable {
+            /**
+             * @return A reader that reads elements from the given file.
+             * @throws IOException Thrown if an I/O error occurs when opening the file.
+             */
+            Reader<T> createFor(Path path) throws IOException;
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/SimpleStringDecoder.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/SimpleStringDecoder.java
new file mode 100644
index 0000000..49661ba7
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/SimpleStringDecoder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader.Decoder;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * A sink {@link Decoder} that reads data encoded by the {@link
+ * org.apache.flink.api.common.serialization.SimpleStringEncoder} only for compaction. The original
+ * input type is missing, but it's enough to read string contents for writing the compacted file.
+ */
+@PublicEvolving
+public class SimpleStringDecoder implements Decoder<String> {
+
+    private BufferedReader reader;
+
+    @Override
+    public void open(InputStream input) throws IOException {
+        this.reader = new BufferedReader(new InputStreamReader(input));
+    }
+
+    @Override
+    public String decodeNext() throws IOException {
+        String nextLine = reader.readLine();
+        // String read will be write directly to the compacted file, so the '\n' should be appended.
+        return nextLine == null ? null : (nextLine + '\n');
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
new file mode 100644
index 0000000..23437a5
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Coordinator that coordinates file compaction for the {@link FileSink}.
+ *
+ * <p>All committable emitted from the writers are collected and packed into {@link
+ * CompactorRequest}s. The {@link FileCompactStrategy} defines when the requests can be fired. When
+ * a firing condition is met, the requests will be sent to the {@link CompactorOperator}.
+ *
+ * <p>The {@link CompactCoordinator} stores the non-fired committable as its state, and may emit a
+ * request at any time. A {@link CompactorOperator} must ensure that the ownership of the
+ * committable in a compact request is successfully handed from the coordinator, before it can
+ * actually perform the compaction.
+ */
+@Internal
+public class CompactCoordinator extends AbstractStreamOperator<CompactorRequest>
+        implements OneInputStreamOperator<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+                BoundedOneInput {
+
+    static final ListStateDescriptor<byte[]> REMAINING_COMMITTABLE_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_compact_commit_raw_state", BytePrimitiveArraySerializer.INSTANCE);
+
+    private final FileCompactStrategy strategy;
+    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
+
+    private final Map<String, CompactorRequest> packingRequests = new HashMap<>();
+    private final Map<String, CompactTrigger> triggers = new HashMap<>();
+
+    private ListState<FileSinkCommittable> remainingCommittableState;
+
+    public CompactCoordinator(
+            FileCompactStrategy strategy,
+            SimpleVersionedSerializer<FileSinkCommittable> committableSerializer) {
+        this.strategy = strategy;
+        this.committableSerializer = checkNotNull(committableSerializer);
+    }
+
+    @Override
+    public void processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element)
+            throws Exception {
+        CommittableMessage<FileSinkCommittable> message = element.getValue();
+        if (message instanceof CommittableWithLineage) {
+            FileSinkCommittable committable =
+                    ((CommittableWithLineage<FileSinkCommittable>) element.getValue())
+                            .getCommittable();
+            if (packAndTrigger(committable)) {
+                fireAndPurge(committable.getBucketId());
+            }
+        }
+        // or message instanceof CommittableSummary
+        // info in CommittableSummary is not necessary for compacting at present, ignore it
+    }
+
+    private boolean packAndTrigger(FileSinkCommittable committable) {
+        String bucketId = committable.getBucketId();
+        CompactorRequest bucketRequest =
+                packingRequests.computeIfAbsent(bucketId, CompactorRequest::new);
+        if (committable.hasInProgressFileToCleanup() || committable.hasCompactedFileToCleanup()) {
+            checkState(!committable.hasPendingFile());
+            // cleanup request, pass through directly
+            bucketRequest.addToPassthrough(committable);
+            return false;
+        }
+
+        if (!committable.hasPendingFile()) {
+            throw new RuntimeException("Committable to compact has no content.");
+        }
+
+        CompactTrigger trigger =
+                triggers.computeIfAbsent(bucketId, id -> new CompactTrigger(strategy));
+        CompactTriggerResult triggerResult = trigger.onElement(committable);
+        switch (triggerResult) {
+            case PASS_THROUGH:
+                bucketRequest.addToPassthrough(committable);
+                return false;
+            case CONTINUE:
+                bucketRequest.addToCompact(committable);
+                return false;
+            case FIRE_AND_PURGE:
+                bucketRequest.addToCompact(committable);
+                return true;
+            default:
+                throw new RuntimeException("Unexpected trigger result:" + triggerResult);
+        }
+    }
+
+    private void fireAndPurge(String bucketId) {
+        triggers.remove(bucketId);
+        CompactorRequest request = packingRequests.remove(bucketId);
+        if (request != null) {
+            output.collect(new StreamRecord<>(request));
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        // emit all requests remained
+        for (CompactorRequest request : packingRequests.values()) {
+            output.collect(new StreamRecord<>(request));
+        }
+        packingRequests.clear();
+        triggers.clear();
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        super.prepareSnapshotPreBarrier(checkpointId);
+
+        // trigger on checkpoint
+        List<String> bucketsToFire = new ArrayList<>(triggers.size());
+        for (Map.Entry<String, CompactTrigger> e : triggers.entrySet()) {
+            String bucketId = e.getKey();
+            CompactTrigger trigger = e.getValue();
+            if (trigger.onCheckpoint(checkpointId) == CompactTriggerResult.FIRE_AND_PURGE) {
+                bucketsToFire.add(bucketId);
+            }
+        }
+        bucketsToFire.forEach(this::fireAndPurge);
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        List<FileSinkCommittable> remainingCommittable =
+                packingRequests.values().stream()
+                        .flatMap(r -> r.getCommittableToCompact().stream())
+                        .collect(Collectors.toList());
+        packingRequests.values().stream()
+                .flatMap(r -> r.getCommittableToPassthrough().stream())
+                .forEach(remainingCommittable::add);
+        remainingCommittableState.update(remainingCommittable);
+
+        // triggers will be recomputed when restoring so it's not necessary to store
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+
+        remainingCommittableState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                .getListState(REMAINING_COMMITTABLE_RAW_STATES_DESC),
+                        committableSerializer);
+
+        Iterable<FileSinkCommittable> stateRemaining = remainingCommittableState.get();
+        if (stateRemaining != null) {
+            for (FileSinkCommittable committable : stateRemaining) {
+                // restore and redistribute
+                if (packAndTrigger(committable)) {
+                    fireAndPurge(committable.getBucketId());
+                }
+            }
+        }
+    }
+
+    enum CompactTriggerResult {
+        CONTINUE,
+        FIRE_AND_PURGE,
+        PASS_THROUGH
+    }
+
+    private static class CompactTrigger {
+        private final long threshold;
+        private final int numCheckpointsBeforeCompaction;
+
+        private long size;
+        private long triggeredCpId = -1;
+
+        CompactTrigger(FileCompactStrategy strategy) {
+            this.threshold = strategy.getSizeThreshold();
+            this.numCheckpointsBeforeCompaction = strategy.getNumCheckpointsBeforeCompaction();
+        }
+
+        public CompactTriggerResult onElement(FileSinkCommittable committable) {
+            if (threshold < 0) {
+                return CompactTriggerResult.CONTINUE;
+            }
+
+            PendingFileRecoverable file = committable.getPendingFile();
+            if (file == null) {
+                return CompactTriggerResult.PASS_THROUGH;
+            }
+
+            if (file.getPath() == null || !file.getPath().getName().startsWith(".")) {
+                // the file may be written with writer of elder version, or
+                // the file will be visible once committed, so it can not be compacted.
+                // pass through, add to results, do not add to compacting files
+                return CompactTriggerResult.PASS_THROUGH;
+            }
+
+            long curSize = file.getSize();
+            if (curSize < 0) {
+                // unrecognized committable, can not compact, pass through directly
+                return CompactTriggerResult.PASS_THROUGH;
+            }
+
+            size += curSize;
+            return size >= threshold
+                    ? CompactTriggerResult.FIRE_AND_PURGE
+                    : CompactTriggerResult.CONTINUE;
+        }
+
+        public CompactTriggerResult onCheckpoint(long checkpointId) {
+            if (numCheckpointsBeforeCompaction < 0) {
+                return CompactTriggerResult.CONTINUE;
+            }
+            if (triggeredCpId < 0) {
+                triggeredCpId = checkpointId - 1;
+            }
+            return checkpointId - triggeredCpId >= numCheckpointsBeforeCompaction
+                    ? CompactTriggerResult.FIRE_AND_PURGE
+                    : CompactTriggerResult.CONTINUE;
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorFactory.java
new file mode 100644
index 0000000..f06b09e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.util.function.SerializableSupplierWithException;
+
+import java.io.IOException;
+
+/** Factory for {@link CompactCoordinator}. */
+@Internal
+public class CompactCoordinatorFactory extends AbstractStreamOperatorFactory<CompactorRequest>
+        implements OneInputStreamOperatorFactory<
+                CommittableMessage<FileSinkCommittable>, CompactorRequest> {
+
+    private final FileCompactStrategy strategy;
+    private final SerializableSupplierWithException<
+                    SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+            committableSerializerSupplier;
+
+    public CompactCoordinatorFactory(
+            FileCompactStrategy strategy,
+            SerializableSupplierWithException<
+                            SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+                    committableSerializerSupplier) {
+        this.strategy = strategy;
+        this.committableSerializerSupplier = committableSerializerSupplier;
+    }
+
+    @Override
+    public <T extends StreamOperator<CompactorRequest>> T createStreamOperator(
+            StreamOperatorParameters<CompactorRequest> parameters) {
+        try {
+            final CompactCoordinator compactOperator =
+                    new CompactCoordinator(strategy, committableSerializerSupplier.get());
+            compactOperator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            return (T) compactOperator;
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Cannot create commit operator for "
+                            + parameters.getStreamConfig().getOperatorName(),
+                    e);
+        }
+    }
+
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+        return CompactCoordinator.class;
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
new file mode 100644
index 0000000..2b147ae
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import static org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.REMAINING_COMMITTABLE_RAW_STATES_DESC;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Handler that processes the state of {@link CompactCoordinator} when compaction is disabled. */
+public class CompactCoordinatorStateHandler
+        extends AbstractStreamOperator<
+                Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>>
+        implements OneInputStreamOperator<
+                        CommittableMessage<FileSinkCommittable>,
+                        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>>,
+                BoundedOneInput,
+                CheckpointListener {
+
+    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
+
+    public CompactCoordinatorStateHandler(
+            SimpleVersionedSerializer<FileSinkCommittable> committableSerializer) {
+        this.committableSerializer = checkNotNull(committableSerializer);
+    }
+
+    @Override
+    public void processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element)
+            throws Exception {
+        output.collect(new StreamRecord<>(Either.Left(element.getValue())));
+    }
+
+    @Override
+    public void endInput() throws Exception {}
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+
+        ListState<FileSinkCommittable> remainingCommittableState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                .getListState(REMAINING_COMMITTABLE_RAW_STATES_DESC),
+                        committableSerializer);
+
+        Iterable<FileSinkCommittable> stateRemaining = remainingCommittableState.get();
+        if (stateRemaining != null) {
+            for (FileSinkCommittable committable : stateRemaining) {
+                // all committable should be wrapped with a single request, since multiple files
+                // compacting is not available now
+                String bucketId = committable.getBucketId();
+                CompactorRequest request = new CompactorRequest(bucketId);
+                request.addToCompact(committable);
+                output.collect(new StreamRecord<>(Either.Right(request)));
+            }
+        }
+
+        // Remaining committable should be all emitted, and the state can be cleared. From now on
+        // the operator is stateless, snapshotState is not necessary.
+        remainingCommittableState.clear();
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandlerFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandlerFactory.java
new file mode 100644
index 0000000..d4ebe11
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandlerFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.function.SerializableSupplierWithException;
+
+import java.io.IOException;
+
+/** Factory for {@link CompactCoordinatorStateHandler}. */
+@Internal
+public class CompactCoordinatorStateHandlerFactory
+        extends AbstractStreamOperatorFactory<
+                Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>>
+        implements OneInputStreamOperatorFactory<
+                CommittableMessage<FileSinkCommittable>,
+                Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>> {
+
+    private final SerializableSupplierWithException<
+                    SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+            committableSerializerSupplier;
+
+    public CompactCoordinatorStateHandlerFactory(
+            SerializableSupplierWithException<
+                            SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+                    committableSerializerSupplier) {
+        this.committableSerializerSupplier = committableSerializerSupplier;
+    }
+
+    @Override
+    public <
+                    T extends
+                            StreamOperator<
+                                            Either<
+                                                    CommittableMessage<FileSinkCommittable>,
+                                                    CompactorRequest>>>
+            T createStreamOperator(
+                    StreamOperatorParameters<
+                                    Either<
+                                            CommittableMessage<FileSinkCommittable>,
+                                            CompactorRequest>>
+                            parameters) {
+        try {
+            final CompactCoordinatorStateHandler handler =
+                    new CompactCoordinatorStateHandler(committableSerializerSupplier.get());
+            handler.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            return (T) handler;
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Cannot create commit operator for "
+                            + parameters.getStreamConfig().getOperatorName(),
+                    e);
+        }
+    }
+
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+        return CompactCoordinator.class;
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
new file mode 100644
index 0000000..7e5c7f6
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The asynchronous file compaction service. */
+@Internal
+public class CompactService {
+    private static final String COMPACTED_PREFIX = "compacted-";
+
+    private final int numCompactThreads;
+    private final FileCompactor fileCompactor;
+    private final BucketWriter<?, String> bucketWriter;
+
+    private transient ExecutorService compactService;
+
+    public CompactService(
+            int numCompactThreads,
+            FileCompactor fileCompactor,
+            BucketWriter<?, String> bucketWriter) {
+        this.numCompactThreads = numCompactThreads;
+        this.fileCompactor = fileCompactor;
+        this.bucketWriter = bucketWriter;
+    }
+
+    public void open() {
+        compactService =
+                Executors.newFixedThreadPool(
+                        Math.max(1, Math.min(numCompactThreads, Hardware.getNumberCPUCores())),
+                        new ExecutorThreadFactory("compact-executor"));
+    }
+
+    public void submit(
+            CompactorRequest request,
+            CompletableFuture<Iterable<FileSinkCommittable>> resultFuture) {
+        compactService.submit(
+                () -> {
+                    try {
+                        Iterable<FileSinkCommittable> result = compact(request);
+                        resultFuture.complete(result);
+                    } catch (Exception e) {
+                        resultFuture.completeExceptionally(e);
+                    }
+                });
+    }
+
+    public void close() {
+        if (compactService != null) {
+            compactService.shutdownNow();
+        }
+    }
+
+    private Iterable<FileSinkCommittable> compact(CompactorRequest request) throws Exception {
+        List<FileSinkCommittable> results = new ArrayList<>(request.getCommittableToPassthrough());
+
+        List<Path> compactingFiles = getCompactingPath(request, results);
+        if (compactingFiles.isEmpty()) {
+            return results;
+        }
+
+        Path targetPath = assembleCompactedFilePath(compactingFiles.get(0));
+        CompactingFileWriter compactingFileWriter =
+                bucketWriter.openNewCompactingFile(
+                        fileCompactor.getWriterType(),
+                        request.getBucketId(),
+                        targetPath,
+                        System.currentTimeMillis());
+        fileCompactor.compact(compactingFiles, compactingFileWriter);
+        PendingFileRecoverable compactedPendingFile = compactingFileWriter.closeForCommit();
+
+        FileSinkCommittable compacted =
+                new FileSinkCommittable(request.getBucketId(), compactedPendingFile);
+        results.add(compacted);
+        for (Path f : compactingFiles) {
+            // cleanup compacted files
+            results.add(new FileSinkCommittable(request.getBucketId(), f));
+        }
+
+        return results;
+    }
+
+    // results: side output pass through committable
+    private List<Path> getCompactingPath(
+            CompactorRequest request, List<FileSinkCommittable> results) throws IOException {
+        List<FileSinkCommittable> compactingCommittable = request.getCommittableToCompact();
+        List<Path> compactingFiles = new ArrayList<>();
+
+        for (FileSinkCommittable committable : compactingCommittable) {
+            PendingFileRecoverable pendingFile = committable.getPendingFile();
+            checkState(
+                    pendingFile != null, "Illegal committable to compact, pending file is null.");
+
+            Path pendingPath = pendingFile.getPath();
+            checkState(
+                    pendingPath != null && pendingPath.getName().startsWith("."),
+                    "Illegal pending file to compact, path should start with . but is "
+                            + pendingPath);
+
+            // commit the pending file and compact the committed file
+            bucketWriter.recoverPendingFile(pendingFile).commitAfterRecovery();
+            compactingFiles.add(pendingPath);
+        }
+        return compactingFiles;
+    }
+
+    private static Path assembleCompactedFilePath(Path uncompactedPath) {
+        String uncompactedName = uncompactedPath.getName();
+        if (uncompactedName.startsWith(".")) {
+            uncompactedName = uncompactedName.substring(1);
+        }
+        return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName);
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
new file mode 100644
index 0000000..33365b9
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * An operator that perform compaction for the {@link FileSink}.
+ *
+ * <p>Requests received from the {@link CompactCoordinator} will firstly be held in memory, and
+ * snapshot into the state of a checkpoint. When the checkpoint is successfully completed, all
+ * requests received before can be submitted. The results can be emitted at the next {@link
+ * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to ensure that committers
+ * can receive only one CommittableSummary and the corresponding number of Committable for a single
+ * checkpoint.
+ */
+@Internal
+public class CompactorOperator
+        extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperator<
+                        CompactorRequest, CommittableMessage<FileSinkCommittable>>,
+                BoundedOneInput,
+                CheckpointListener {
+
+    private static final long SUBMITTED_ID = -1L;
+
+    static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_requests_raw_state", BytePrimitiveArraySerializer.INSTANCE);
+
+    private final FileCompactStrategy strategy;
+    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
+
+    private final FileCompactor fileCompactor;
+    private final BucketWriter<?, String> bucketWriter;
+
+    private transient CompactService compactService;
+
+    // collecting requests during the current checkpoint
+    private List<CompactorRequest> collectingRequests = new ArrayList<>();
+    // checkpoint id to its collecting requests, the requests is added when checkpointing, and are
+    // moved to compacting requests once submitted.
+    private final TreeMap<Long, List<CompactorRequest>> checkpointRequests = new TreeMap<>();
+    // the currently compacting requests and their completable futures, completed requests are
+    // removed and the results are emitted at #prepareSnapshotPreBarrier
+    private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>>
+            compactingRequests = new LinkedList<>();
+
+    // state combining checkpointRequests and compactingRequests, compactingRequests will be
+    // submitted again while restoring
+    private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
+
+    public CompactorOperator(
+            FileCompactStrategy strategy,
+            SimpleVersionedSerializer<FileSinkCommittable> committableSerializer,
+            FileCompactor fileCompactor,
+            BucketWriter<?, String> bucketWriter) {
+        this.strategy = strategy;
+        this.committableSerializer = committableSerializer;
+        this.fileCompactor = fileCompactor;
+        this.bucketWriter = bucketWriter;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.compactService =
+                new CompactService(strategy.getNumCompactThreads(), fileCompactor, bucketWriter);
+        compactService.open();
+
+        // submit all requests that is already submitted before restored checkpoint
+        submitUntil(SUBMITTED_ID);
+    }
+
+    @Override
+    public void processElement(StreamRecord<CompactorRequest> element) throws Exception {
+        collectingRequests.add(element.getValue());
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        // add collecting requests into the final snapshot
+        checkpointRequests.put(Long.MAX_VALUE, collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // submit all requests and wait until they are done
+        submitUntil(Long.MAX_VALUE);
+        assert checkpointRequests.isEmpty();
+
+        getAllTasksFuture().join();
+        emitCompacted(null);
+        assert compactingRequests.isEmpty();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+        submitUntil(checkpointId);
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        emitCompacted(checkpointId);
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        // add collecting requests during the checkpoint into the snapshot
+        checkpointRequests.put(context.getCheckpointId(), collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // snapshot all compacting requests as well, including the requests that are not finished
+        // when invoking prepareSnapshotPreBarrier but finished now, since they are not emitted yet
+        Map<Long, List<CompactorRequest>> requests = new HashMap<>(checkpointRequests);
+        requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>())
+                .addAll(compactingRequests.stream().map(r -> r.f0).collect(Collectors.toList()));
+        remainingRequestsState.update(Collections.singletonList(requests));
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+
+        remainingRequestsState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                .getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
+                        new RemainingRequestsSerializer(
+                                new CompactorRequestSerializer(committableSerializer)));
+
+        Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = remainingRequestsState.get();
+        if (stateRemaining != null) {
+            for (Map<Long, List<CompactorRequest>> requests : stateRemaining) {
+                // elements can be more than one when redistributed after parallelism changing
+                for (Map.Entry<Long, List<CompactorRequest>> e : requests.entrySet()) {
+                    List<CompactorRequest> list =
+                            checkpointRequests.computeIfAbsent(e.getKey(), id -> new ArrayList<>());
+                    list.addAll(e.getValue());
+                }
+            }
+        }
+        // open is called afterwards, so do not submit here since compact service is not opened yet
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (compactService != null) {
+            compactService.close();
+        }
+    }
+
+    private void submitUntil(long checkpointId) {
+        NavigableMap<Long, List<CompactorRequest>> canSubmit =
+                checkpointRequests.subMap(Long.MIN_VALUE, true, checkpointId, true);
+        for (Entry<Long, List<CompactorRequest>> requestEntry : canSubmit.entrySet()) {
+            for (CompactorRequest request : requestEntry.getValue()) {
+                CompletableFuture<Iterable<FileSinkCommittable>> resultFuture =
+                        new CompletableFuture<>();
+                compactingRequests.add(new Tuple2<>(request, resultFuture));
+                compactService.submit(request, resultFuture);
+            }
+        }
+        canSubmit.clear();
+    }
+
+    private void emitCompacted(@Nullable Long checkpointId) throws Exception {
+        List<FileSinkCommittable> compacted = new ArrayList<>();
+        Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter =
+                compactingRequests.iterator();
+        while (iter.hasNext()) {
+            Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> compacting =
+                    iter.next();
+            CompletableFuture<Iterable<FileSinkCommittable>> future = compacting.f1;
+            if (future.isDone()) {
+                iter.remove();
+                // Exception is thrown if it's completed exceptionally
+                for (FileSinkCommittable c : future.get()) {
+                    compacted.add(c);
+                }
+            }
+        }
+
+        if (compacted.isEmpty()) {
+            return;
+        }
+
+        // A summary must be sent before all results during this checkpoint
+        CommittableSummary<FileSinkCommittable> summary =
+                new CommittableSummary<>(
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        getRuntimeContext().getNumberOfParallelSubtasks(),
+                        checkpointId,
+                        compacted.size(),
+                        compacted.size(),
+                        0);
+        output.collect(new StreamRecord<>(summary));
+        for (FileSinkCommittable c : compacted) {
+            CommittableWithLineage<FileSinkCommittable> comm =
+                    new CommittableWithLineage<>(
+                            c, checkpointId, getRuntimeContext().getIndexOfThisSubtask());
+            output.collect(new StreamRecord<>(comm));
+        }
+    }
+
+    @VisibleForTesting
+    public CompletableFuture<?> getAllTasksFuture() {
+        return CompletableFuture.allOf(
+                compactingRequests.stream().map(r -> r.f1).toArray(CompletableFuture[]::new));
+    }
+
+    static class RemainingRequestsSerializer
+            implements SimpleVersionedSerializer<Map<Long, List<CompactorRequest>>> {
+
+        private static final int MAGIC_NUMBER = 0xa946be83;
+
+        private final CompactorRequestSerializer requestSerializer;
+
+        RemainingRequestsSerializer(CompactorRequestSerializer requestSerializer) {
+            this.requestSerializer = requestSerializer;
+        }
+
+        @Override
+        public int getVersion() {
+            return 1;
+        }
+
+        @Override
+        public byte[] serialize(Map<Long, List<CompactorRequest>> remainingRequests)
+                throws IOException {
+            DataOutputSerializer out = new DataOutputSerializer(256);
+            out.writeInt(MAGIC_NUMBER);
+            serializeV1(remainingRequests, out);
+            return out.getCopyOfBuffer();
+        }
+
+        @Override
+        public Map<Long, List<CompactorRequest>> deserialize(int version, byte[] serialized)
+                throws IOException {
+            DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+            switch (version) {
+                case 1:
+                    validateMagicNumber(in);
+                    return deserializeV1(in);
+                default:
+                    throw new IOException("Unrecognized version or corrupt state: " + version);
+            }
+        }
+
+        private void serializeV1(
+                Map<Long, List<CompactorRequest>> request, DataOutputSerializer out)
+                throws IOException {
+            out.writeInt(request.size());
+            for (Map.Entry<Long, List<CompactorRequest>> e : request.entrySet()) {
+                out.writeLong(e.getKey());
+                SimpleVersionedSerialization.writeVersionAndSerializeList(
+                        requestSerializer, e.getValue(), out);
+            }
+        }
+
+        private Map<Long, List<CompactorRequest>> deserializeV1(DataInputDeserializer in)
+                throws IOException {
+            int size = in.readInt();
+            Map<Long, List<CompactorRequest>> requestMap = new HashMap<>(size);
+            for (int i = 0; i < size; i++) {
+                long cpId = in.readLong();
+                List<CompactorRequest> requests =
+                        SimpleVersionedSerialization.readVersionAndDeserializeList(
+                                requestSerializer, in);
+                requestMap.put(cpId, requests);
+            }
+            return requestMap;
+        }
+
+        private static void validateMagicNumber(DataInputView in) throws IOException {
+            int magicNumber = in.readInt();
+            if (magicNumber != MAGIC_NUMBER) {
+                throw new IOException(
+                        String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+            }
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorFactory.java
new file mode 100644
index 0000000..edbac27
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.util.function.SerializableSupplierWithException;
+
+import java.io.IOException;
+
+/** Factory for {@link CompactorOperator}. */
+@Internal
+public class CompactorOperatorFactory
+        extends AbstractStreamOperatorFactory<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperatorFactory<
+                CompactorRequest, CommittableMessage<FileSinkCommittable>> {
+
+    private final FileCompactStrategy strategy;
+    private final FileCompactor fileCompactor;
+    private final SerializableSupplierWithException<
+                    SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+            committableSerializerSupplier;
+    private final SerializableSupplierWithException<BucketWriter<?, String>, IOException>
+            bucketWriterProvider;
+
+    public CompactorOperatorFactory(
+            FileCompactStrategy strategy,
+            FileCompactor fileCompactor,
+            SerializableSupplierWithException<
+                            SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+                    committableSerializerSupplier,
+            SerializableSupplierWithException<BucketWriter<?, String>, IOException>
+                    bucketWriterProvider) {
+        this.strategy = strategy;
+        this.fileCompactor = fileCompactor;
+        this.committableSerializerSupplier = committableSerializerSupplier;
+        this.bucketWriterProvider = bucketWriterProvider;
+    }
+
+    @Override
+    public <T extends StreamOperator<CommittableMessage<FileSinkCommittable>>>
+            T createStreamOperator(
+                    StreamOperatorParameters<CommittableMessage<FileSinkCommittable>> parameters) {
+        try {
+            final CompactorOperator compactOperator =
+                    new CompactorOperator(
+                            strategy,
+                            committableSerializerSupplier.get(),
+                            fileCompactor,
+                            bucketWriterProvider.get());
+            compactOperator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            return (T) compactOperator;
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Cannot create commit operator for "
+                            + parameters.getStreamConfig().getOperatorName(),
+                    e);
+        }
+    }
+
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+        return CompactorOperator.class;
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
new file mode 100644
index 0000000..70e6458
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.connector.file.sink.compactor.IdenticalFileCompactor;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.RemainingRequestsSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.REMAINING_REQUESTS_RAW_STATES_DESC;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Handler that processes the state of {@link CompactorOperator} when compaction is disabled. */
+@Internal
+public class CompactorOperatorStateHandler
+        extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperator<
+                        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+                        CommittableMessage<FileSinkCommittable>>,
+                BoundedOneInput,
+                CheckpointListener {
+
+    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
+    private final BucketWriter<?, String> bucketWriter;
+
+    private final FileCompactor fileCompactor;
+
+    private transient CompactService compactService;
+
+    // Flag indicating the in-progress file of the previous run from the writer has been received
+    // and processed.
+    private boolean writerStateDrained = false;
+
+    // Flag indicating all compaction related states are drained, the operator can now pass through
+    // everything.
+    private boolean stateDrained = false;
+
+    // There may be a in-progress file of the previous run that we have to process as a compaction
+    // request first, or the file is invisible after committing.
+    // We have to hold the summary and committables (of this run), and send them along with the
+    // result of this compaction request, as well as the results of the remaining requests of this
+    // operator, if there are.
+    private CommittableSummary<FileSinkCommittable> holdingSummary;
+    private List<CommittableMessage<FileSinkCommittable>> holdingMessages;
+    private final List<CommittableMessage<FileSinkCommittable>> compactingMessages =
+            new ArrayList<>();
+
+    private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>>
+            compactingRequests = new ArrayList<>();
+    private SimpleVersionedListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
+    private Iterable<Map<Long, List<CompactorRequest>>> stateRemaining;
+
+    public CompactorOperatorStateHandler(
+            SimpleVersionedSerializer<FileSinkCommittable> committableSerializer,
+            BucketWriter<?, String> bucketWriter) {
+        this.committableSerializer = committableSerializer;
+        this.bucketWriter = bucketWriter;
+
+        this.fileCompactor = new IdenticalFileCompactor();
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.compactService = new CompactService(1, fileCompactor, bucketWriter);
+        compactService.open();
+
+        if (stateRemaining != null) {
+            for (Map<Long, List<CompactorRequest>> requests : stateRemaining) {
+                for (Map.Entry<Long, List<CompactorRequest>> e : requests.entrySet()) {
+                    for (CompactorRequest request : e.getValue()) {
+                        // each toCompact must be wrapped with a single compact request, since
+                        // multiple files compacting is not available now
+                        List<FileSinkCommittable> toCompactList = request.getCommittableToCompact();
+                        List<FileSinkCommittable> toPassThrough =
+                                request.getCommittableToPassthrough();
+
+                        String bucketId =
+                                !toCompactList.isEmpty()
+                                        ? toCompactList.get(0).getBucketId()
+                                        : toPassThrough.get(0).getBucketId();
+
+                        for (FileSinkCommittable toCompact : toCompactList) {
+                            CompactorRequest compactRequest = new CompactorRequest(bucketId);
+                            compactRequest.addToCompact(toCompact);
+                            submit(compactRequest);
+                        }
+
+                        CompactorRequest passThroughRequest = new CompactorRequest(bucketId);
+                        toPassThrough.forEach(passThroughRequest::addToPassthrough);
+                        submit(passThroughRequest);
+                    }
+                }
+            }
+        }
+        stateRemaining = null;
+    }
+
+    @Override
+    public void processElement(
+            StreamRecord<Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>> element)
+            throws Exception {
+        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> record =
+                element.getValue();
+        if (stateDrained) {
+            // all input should be committable messages to pass through
+            output.collect(new StreamRecord<>(record.left()));
+            return;
+        }
+
+        if (record.isRight()) {
+            submit(element.getValue().right());
+            return;
+        }
+
+        CommittableMessage<FileSinkCommittable> message = record.left();
+        if (message instanceof CommittableSummary) {
+            checkState(holdingSummary == null, "Duplicate summary before the first checkpoint.");
+            holdingSummary = (CommittableSummary<FileSinkCommittable>) message;
+            holdingMessages = new ArrayList<>(holdingSummary.getNumberOfCommittables());
+        } else {
+            boolean compacting = false;
+            CommittableWithLineage<FileSinkCommittable> committableWithLineage =
+                    (CommittableWithLineage<FileSinkCommittable>) message;
+            if (committableWithLineage.getCommittable().hasPendingFile()) {
+                FileSinkCommittable committable = committableWithLineage.getCommittable();
+                PendingFileRecoverable pendingFile = committable.getPendingFile();
+                if (pendingFile.getPath() != null
+                        && pendingFile.getPath().getName().startsWith(".")) {
+                    // The pending file is the in-progress file of the previous run, which
+                    // should be committed and compacted before sending to the committer.
+                    CompactorRequest request = new CompactorRequest(committable.getBucketId());
+                    request.addToCompact(committable);
+                    submit(request);
+
+                    compacting = true;
+                    compactingMessages.add(message);
+                } else {
+                    // A normal file is received, indicating the writer state is drained.
+                    writerStateDrained = true;
+                    if (compactingMessages.isEmpty() && compactingRequests.isEmpty()) {
+                        // No state needs to be handled, the holding summary and all committable
+                        // messages can be sent eagerly
+                        checkState(holdingSummary != null);
+                        output.collect(new StreamRecord<>(holdingSummary));
+                        holdingSummary = null;
+
+                        this.stateDrained = true;
+                        output.collect(new StreamRecord<>(committableWithLineage));
+                    }
+                }
+            }
+            if (!compacting && !stateDrained) {
+                // Compacting messages should not be added
+                // If the state is drained, no further messages need to be added
+                holdingMessages.add(message);
+            }
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        super.prepareSnapshotPreBarrier(checkpointId);
+        if (stateDrained) {
+            return;
+        }
+        drain();
+        // The operator is stateless once drain is called. snapshotState is not necessary.
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        if (!stateDrained) {
+            drain();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (compactService != null) {
+            compactService.close();
+        }
+    }
+
+    private void submit(CompactorRequest request) {
+        CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>();
+        compactService.submit(request, resultFuture);
+        compactingRequests.add(new Tuple2<>(request, resultFuture));
+    }
+
+    private void drain() throws ExecutionException, InterruptedException {
+        checkState(holdingSummary != null);
+        checkState(
+                holdingSummary.getNumberOfPendingCommittables()
+                                == holdingSummary.getNumberOfCommittables()
+                        && holdingSummary.getNumberOfCommittables()
+                                == holdingMessages.size() + compactingMessages.size());
+
+        Long checkpointId =
+                holdingSummary.getCheckpointId().isPresent()
+                        ? holdingSummary.getCheckpointId().getAsLong()
+                        : null;
+        int subtaskId = holdingSummary.getSubtaskId();
+
+        if (!compactingRequests.isEmpty()) {
+            CompletableFuture.allOf(
+                            compactingRequests.stream()
+                                    .map(r -> r.f1)
+                                    .toArray(CompletableFuture[]::new))
+                    .join();
+
+            for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>
+                    compacting : compactingRequests) {
+                CompletableFuture<Iterable<FileSinkCommittable>> future = compacting.f1;
+                checkState(future.isDone());
+                // Exception is thrown if it's completed exceptionally
+                for (FileSinkCommittable c : future.get()) {
+                    holdingMessages.add(new CommittableWithLineage<>(c, checkpointId, subtaskId));
+                }
+            }
+        }
+
+        // Appending the compacted committable to the holding summary
+        CommittableSummary<FileSinkCommittable> summary =
+                new CommittableSummary<>(
+                        holdingSummary.getSubtaskId(),
+                        holdingSummary.getNumberOfSubtasks(),
+                        holdingSummary.getCheckpointId().isPresent()
+                                ? holdingSummary.getCheckpointId().getAsLong()
+                                : null,
+                        holdingMessages.size(),
+                        holdingMessages.size(),
+                        holdingSummary.getNumberOfFailedCommittables());
+        output.collect(new StreamRecord<>(summary));
+        for (CommittableMessage<FileSinkCommittable> committable : holdingMessages) {
+            output.collect(new StreamRecord<>(committable));
+        }
+
+        // Remaining requests should be all done and their results are all emitted.
+        // From now on the operator is stateless.
+        remainingRequestsState.clear();
+
+        compactingRequests.clear();
+        compactingMessages.clear();
+        holdingSummary = null;
+        holdingMessages = null;
+
+        if (writerStateDrained) {
+            // We can pass through everything if the writer state is also drained.
+            stateDrained = true;
+            compactService.close();
+            compactService = null;
+        }
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+
+        remainingRequestsState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                .getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
+                        new RemainingRequestsSerializer(
+                                new CompactorRequestSerializer(committableSerializer)));
+
+        stateRemaining = remainingRequestsState.get();
+
+        // stateDrained can not be determined here, since even if the stateRemaining is empty,
+        // there may still be some requests from the coordinator and a in-progress file in the file
+        // writer
+    }
+
+    @VisibleForTesting
+    public boolean isWriterStateDrained() {
+        return writerStateDrained;
+    }
+
+    @VisibleForTesting
+    public boolean isStateDrained() {
+        return stateDrained;
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandlerFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandlerFactory.java
new file mode 100644
index 0000000..2d36fee
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandlerFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.function.SerializableSupplierWithException;
+
+import java.io.IOException;
+
+/** Factory for {@link CompactorOperatorStateHandler}. */
+@Internal
+public class CompactorOperatorStateHandlerFactory
+        extends AbstractStreamOperatorFactory<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperatorFactory<
+                Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+                CommittableMessage<FileSinkCommittable>> {
+
+    private final SerializableSupplierWithException<
+                    SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+            committableSerializerSupplier;
+    private final SerializableSupplierWithException<BucketWriter<?, String>, IOException>
+            bucketWriterProvider;
+
+    public CompactorOperatorStateHandlerFactory(
+            SerializableSupplierWithException<
+                            SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+                    committableSerializerSupplier,
+            SerializableSupplierWithException<BucketWriter<?, String>, IOException>
+                    bucketWriterProvider) {
+        this.committableSerializerSupplier = committableSerializerSupplier;
+        this.bucketWriterProvider = bucketWriterProvider;
+    }
+
+    @Override
+    public <T extends StreamOperator<CommittableMessage<FileSinkCommittable>>>
+            T createStreamOperator(
+                    StreamOperatorParameters<CommittableMessage<FileSinkCommittable>> parameters) {
+        try {
+            final CompactorOperatorStateHandler handler =
+                    new CompactorOperatorStateHandler(
+                            committableSerializerSupplier.get(), bucketWriterProvider.get());
+            handler.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            return (T) handler;
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Cannot create commit operator for "
+                            + parameters.getStreamConfig().getOperatorName(),
+                    e);
+        }
+    }
+
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+        return CompactorOperator.class;
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
new file mode 100644
index 0000000..86bc78f
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Request of file compacting for {@link FileSink}. */
+@Internal
+public class CompactorRequest implements Serializable {
+    private final String bucketId;
+    private final List<FileSinkCommittable> committableToCompact;
+    private final List<FileSinkCommittable> committableToPassthrough;
+
+    public CompactorRequest(String bucketId) {
+        this.bucketId = bucketId;
+        this.committableToCompact = new ArrayList<>();
+        this.committableToPassthrough = new ArrayList<>();
+    }
+
+    public CompactorRequest(
+            String bucketId,
+            List<FileSinkCommittable> committableToCompact,
+            List<FileSinkCommittable> committableToPassthrough) {
+        this.bucketId = bucketId;
+        this.committableToCompact = committableToCompact;
+        this.committableToPassthrough = committableToPassthrough;
+    }
+
+    public void addToCompact(FileSinkCommittable committable) {
+        committableToCompact.add(committable);
+    }
+
+    public void addToPassthrough(FileSinkCommittable committable) {
+        committableToPassthrough.add(committable);
+    }
+
+    public String getBucketId() {
+        return bucketId;
+    }
+
+    public List<FileSinkCommittable> getCommittableToCompact() {
+        return committableToCompact;
+    }
+
+    public List<FileSinkCommittable> getCommittableToPassthrough() {
+        return committableToPassthrough;
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestSerializer.java
new file mode 100644
index 0000000..e00e32f
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestSerializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Versioned serializer for {@link CompactorRequest}. */
+@Internal
+public class CompactorRequestSerializer implements SimpleVersionedSerializer<CompactorRequest> {
+
+    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
+
+    private static final int MAGIC_NUMBER = 0x2fc61e19;
+
+    public CompactorRequestSerializer(
+            SimpleVersionedSerializer<FileSinkCommittable> committableSerializer) {
+        this.committableSerializer = committableSerializer;
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(CompactorRequest request) throws IOException {
+        DataOutputSerializer out = new DataOutputSerializer(256);
+        out.writeInt(MAGIC_NUMBER);
+        serializeV1(request, out);
+        return out.getCopyOfBuffer();
+    }
+
+    @Override
+    public CompactorRequest deserialize(int version, byte[] serialized) throws IOException {
+        DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+        switch (version) {
+            case 1:
+                validateMagicNumber(in);
+                return deserializeV1(in);
+            default:
+                throw new IOException("Unrecognized version or corrupt state: " + version);
+        }
+    }
+
+    private void serializeV1(CompactorRequest request, DataOutputSerializer out)
+            throws IOException {
+        out.writeUTF(request.getBucketId());
+        SimpleVersionedSerialization.writeVersionAndSerializeList(
+                committableSerializer, request.getCommittableToCompact(), out);
+        SimpleVersionedSerialization.writeVersionAndSerializeList(
+                committableSerializer, request.getCommittableToPassthrough(), out);
+    }
+
+    private CompactorRequest deserializeV1(DataInputDeserializer in) throws IOException {
+        String bucketId = in.readUTF();
+        List<FileSinkCommittable> committableToCompact =
+                SimpleVersionedSerialization.readVersionAndDeserializeList(
+                        committableSerializer, in);
+        List<FileSinkCommittable> committableToPassthrough =
+                SimpleVersionedSerialization.readVersionAndDeserializeList(
+                        committableSerializer, in);
+        return new CompactorRequest(bucketId, committableToCompact, committableToPassthrough);
+    }
+
+    private static void validateMagicNumber(DataInputView in) throws IOException {
+        int magicNumber = in.readInt();
+        if (magicNumber != MAGIC_NUMBER) {
+            throw new IOException(
+                    String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestTypeInfo.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestTypeInfo.java
new file mode 100644
index 0000000..b9bbf87
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestTypeInfo.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.util.function.SerializableSupplierWithException;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** Type information of {@link CompactorRequest}. Unsuitable for state. */
+@Internal
+public class CompactorRequestTypeInfo extends TypeInformation<CompactorRequest> {
+
+    private final SerializableSupplierWithException<
+                    SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+            committableSerializerSupplier;
+
+    public CompactorRequestTypeInfo(
+            SerializableSupplierWithException<
+                            SimpleVersionedSerializer<FileSinkCommittable>, IOException>
+                    committableSerializerSupplier) {
+        this.committableSerializerSupplier = committableSerializerSupplier;
+    }
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 1;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    public Class<CompactorRequest> getTypeClass() {
+        return CompactorRequest.class;
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<CompactorRequest> createSerializer(ExecutionConfig config) {
+        return new SimpleVersionedSerializerTypeSerializerProxy<>(
+                () -> new CompactorRequestSerializer(createCommittableSerializer()));
+    }
+
+    @Override
+    public String toString() {
+        return "CompactorRequestTypeInfo{" + "serializer=" + createCommittableSerializer() + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || !canEqual(o)) {
+            return false;
+        }
+        CompactorRequestTypeInfo that = (CompactorRequestTypeInfo) o;
+        return Objects.equals(
+                createCommittableSerializer().getClass(),
+                that.createCommittableSerializer().getClass());
+    }
+
+    @Override
+    public boolean canEqual(Object obj) {
+        return obj instanceof CompactorRequestTypeInfo;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(createCommittableSerializer().getClass());
+    }
+
+    private SimpleVersionedSerializer<FileSinkCommittable> createCommittableSerializer() {
+        try {
+            return committableSerializerSupplier.get();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializerMigrationTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializerMigrationTest.java
index 87968821..0d5c96e 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializerMigrationTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializerMigrationTest.java
@@ -89,7 +89,7 @@ public class FileSinkCommittableSerializerMigrationTest {
 
         OutputStreamBasedInProgressFileRecoverable recoverable =
                 new OutputStreamBasedInProgressFileRecoverable(resumeRecoverable);
-        FileSinkCommittable committable = new FileSinkCommittable(recoverable);
+        FileSinkCommittable committable = new FileSinkCommittable("0", recoverable);
 
         byte[] bytes = serializer.serialize(committable);
         Files.write(path.resolve("committable"), bytes);
@@ -134,7 +134,7 @@ public class FileSinkCommittableSerializerMigrationTest {
 
         OutputStreamBasedPendingFileRecoverable recoverable =
                 new OutputStreamBasedPendingFileRecoverable(commitRecoverable);
-        FileSinkCommittable committable = new FileSinkCommittable(recoverable);
+        FileSinkCommittable committable = new FileSinkCommittable("0", recoverable);
 
         byte[] bytes = serializer.serialize(committable);
         Files.write(path.resolve("committable"), bytes);
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/AbstractCompactTestBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/AbstractCompactTestBase.java
new file mode 100644
index 0000000..cffada6
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/AbstractCompactTestBase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/** Test base for compact operators. */
+public abstract class AbstractCompactTestBase {
+
+    @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+    public static final int TARGET_SIZE = 9;
+
+    Path folder;
+
+    @Before
+    public void before() throws IOException {
+        folder = new Path(TEMP_FOLDER.newFolder().getPath());
+    }
+
+    Path newFile(String name, int len) throws IOException {
+        Path path = new Path(folder, name);
+        File file = new File(path.getPath());
+        file.delete();
+        file.createNewFile();
+
+        try (FileOutputStream out = new FileOutputStream(file)) {
+            for (int i = 0; i < len; i++) {
+                out.write(i);
+            }
+        }
+        return path;
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
new file mode 100644
index 0000000..0fbfb9b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.FileSinkCommittableSerializer;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy.Builder;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinatorStateHandler;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer;
+import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
+import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils.TestInProgressFileRecoverable;
+import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils.TestPendingFileRecoverable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessageSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Either;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Test for {@link CompactCoordinator}. */
+public class CompactCoordinatorTest extends AbstractCompactTestBase {
+
+    @Test
+    public void testSizeThreshold() throws Exception {
+        FileCompactStrategy strategy = Builder.newBuilder().setSizeThreshold(10).build();
+        CompactCoordinator coordinator =
+                new CompactCoordinator(strategy, getTestCommittableSerializer());
+
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(coordinator)) {
+            harness.setup();
+            harness.open();
+
+            FileSinkCommittable committable0 = committable("0", ".0", 5);
+            FileSinkCommittable committable1 = committable("0", ".1", 6);
+            harness.processElement(message(committable0));
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.processElement(message(committable1));
+
+            List<CompactorRequest> results = harness.extractOutputValues();
+            Assert.assertEquals(1, results.size());
+            assertToCompact(results.get(0), committable0, committable1);
+
+            harness.processElement(message(committable("0", ".2", 5)));
+            harness.processElement(message(committable("1", ".0", 5)));
+
+            Assert.assertEquals(1, harness.extractOutputValues().size());
+        }
+    }
+
+    @Test
+    public void testCompactOnCheckpoint() throws Exception {
+        FileCompactStrategy strategy = Builder.newBuilder().enableCompactionOnCheckpoint(1).build();
+        CompactCoordinator coordinator =
+                new CompactCoordinator(strategy, getTestCommittableSerializer());
+
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(coordinator)) {
+            harness.setup();
+            harness.open();
+
+            FileSinkCommittable committable0 = committable("0", ".0", 5);
+            FileSinkCommittable committable1 = committable("0", ".1", 6);
+            FileSinkCommittable committable2 = committable("0", ".2", 5);
+            FileSinkCommittable committable3 = committable("1", ".0", 5);
+
+            harness.processElement(message(committable0));
+            harness.processElement(message(committable1));
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(1);
+            harness.snapshot(1, 1);
+
+            Assert.assertEquals(1, harness.extractOutputValues().size());
+
+            harness.processElement(message(committable2));
+            harness.processElement(message(committable3));
+
+            Assert.assertEquals(1, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(2);
+            harness.snapshot(2, 2);
+
+            List<CompactorRequest> results = harness.extractOutputValues();
+            Assert.assertEquals(3, results.size());
+            assertToCompact(results.get(0), committable0, committable1);
+            assertToCompact(results.get(1), committable2);
+            assertToCompact(results.get(2), committable3);
+        }
+    }
+
+    @Test
+    public void testCompactOverMultipleCheckpoints() throws Exception {
+        FileCompactStrategy strategy = Builder.newBuilder().enableCompactionOnCheckpoint(3).build();
+        CompactCoordinator coordinator =
+                new CompactCoordinator(strategy, getTestCommittableSerializer());
+
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(coordinator)) {
+            harness.setup();
+            harness.open();
+
+            FileSinkCommittable committable0 = committable("0", ".0", 5);
+            FileSinkCommittable committable1 = committable("0", ".1", 6);
+
+            harness.processElement(message(committable0));
+            harness.processElement(message(committable1));
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(1);
+            harness.snapshot(1, 1);
+            harness.prepareSnapshotPreBarrier(2);
+            harness.snapshot(2, 2);
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(3);
+            harness.snapshot(3, 3);
+
+            List<CompactorRequest> results = harness.extractOutputValues();
+            Assert.assertEquals(1, results.size());
+            assertToCompact(results.get(0), committable0, committable1);
+        }
+    }
+
+    @Test
+    public void testCompactOnEndOfInput() throws Exception {
+        FileCompactStrategy strategy = Builder.newBuilder().setSizeThreshold(10).build();
+        CompactCoordinator coordinator =
+                new CompactCoordinator(strategy, getTestCommittableSerializer());
+
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(coordinator)) {
+            harness.setup();
+            harness.open();
+
+            FileSinkCommittable committable0 = committable("0", ".0", 5);
+
+            harness.processElement(message(committable0));
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(1);
+            harness.snapshot(1, 1);
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.endInput();
+
+            List<CompactorRequest> results = harness.extractOutputValues();
+            Assert.assertEquals(1, results.size());
+            assertToCompact(results.get(0), committable0);
+        }
+    }
+
+    @Test
+    public void testPassthrough() throws Exception {
+        FileCompactStrategy strategy = Builder.newBuilder().setSizeThreshold(10).build();
+        CompactCoordinator coordinator =
+                new CompactCoordinator(strategy, getTestCommittableSerializer());
+
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(coordinator)) {
+            harness.setup();
+            harness.open();
+
+            FileSinkCommittable cleanupToPassthrough = cleanupInprogress("0", ".0", 1);
+            FileSinkCommittable sizeUnavailableToPassthrough = committable("0", ".1", -1);
+            FileSinkCommittable pathNotHidToPassThrough = committable("0", "2", -1);
+            FileSinkCommittable normalCommittable = committable("0", ".3", 10);
+
+            harness.processElement(message(cleanupToPassthrough));
+            harness.processElement(message(sizeUnavailableToPassthrough));
+            harness.processElement(message(pathNotHidToPassThrough));
+            harness.processElement(message(normalCommittable));
+
+            List<CompactorRequest> results = harness.extractOutputValues();
+            Assert.assertEquals(1, results.size());
+            assertToCompact(results.get(0), normalCommittable);
+            assertToPassthrough(
+                    results.get(0),
+                    cleanupToPassthrough,
+                    sizeUnavailableToPassthrough,
+                    pathNotHidToPassThrough);
+        }
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        FileCompactStrategy strategy = Builder.newBuilder().setSizeThreshold(10).build();
+        CompactCoordinator coordinator =
+                new CompactCoordinator(strategy, getTestCommittableSerializer());
+
+        FileSinkCommittable committable0 = committable("0", ".0", 5);
+        FileSinkCommittable committable1 = committable("0", ".1", 6);
+        FileSinkCommittable committable2 = committable("0", ".2", 5);
+        FileSinkCommittable committable3 = committable("1", ".0", 5);
+
+        OperatorSubtaskState state;
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(coordinator)) {
+            harness.setup();
+            harness.open();
+
+            harness.processElement(message(committable0));
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(1);
+            state = harness.snapshot(1, 1);
+        }
+
+        coordinator = new CompactCoordinator(strategy, getTestCommittableSerializer());
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(coordinator)) {
+            harness.setup();
+            harness.initializeState(state);
+            harness.open();
+
+            harness.processElement(message(committable1));
+
+            Assert.assertEquals(1, harness.extractOutputValues().size());
+
+            harness.processElement(message(committable2));
+            harness.processElement(message(committable3));
+
+            Assert.assertEquals(1, harness.extractOutputValues().size());
+
+            harness.endInput();
+
+            List<CompactorRequest> results = harness.extractOutputValues();
+            Assert.assertEquals(3, results.size());
+            assertToCompact(results.get(0), committable0, committable1);
+            assertToCompact(results.get(1), committable2);
+            assertToCompact(results.get(2), committable3);
+        }
+    }
+
+    @Test
+    public void testRestoreWithChangedStrategy() throws Exception {
+        FileCompactStrategy strategy = Builder.newBuilder().setSizeThreshold(100).build();
+        CompactCoordinator coordinator =
+                new CompactCoordinator(strategy, getTestCommittableSerializer());
+
+        FileSinkCommittable committable0 = committable("0", ".0", 5);
+        FileSinkCommittable committable1 = committable("0", ".1", 6);
+        FileSinkCommittable committable2 = committable("0", ".2", 7);
+        FileSinkCommittable committable3 = committable("0", ".3", 8);
+        FileSinkCommittable committable4 = committable("0", ".4", 9);
+        FileSinkCommittable committable5 = committable("0", ".5", 2);
+
+        OperatorSubtaskState state;
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(coordinator)) {
+            harness.setup();
+            harness.open();
+
+            harness.processElement(message(committable0));
+            harness.processElement(message(committable1));
+            harness.processElement(message(committable2));
+            harness.processElement(message(committable3));
+            harness.processElement(message(committable4));
+
+            harness.prepareSnapshotPreBarrier(1);
+            state = harness.snapshot(1, 1);
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+        }
+
+        FileCompactStrategy changedStrategy = Builder.newBuilder().setSizeThreshold(10).build();
+        CompactCoordinator changedCoordinator =
+                new CompactCoordinator(changedStrategy, getTestCommittableSerializer());
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(changedCoordinator)) {
+            harness.setup();
+            harness.initializeState(state);
+            harness.open();
+
+            Assert.assertEquals(2, harness.extractOutputValues().size());
+
+            harness.processElement(message(committable5));
+
+            List<CompactorRequest> results = harness.extractOutputValues();
+            Assert.assertEquals(3, results.size());
+            assertToCompact(results.get(0), committable0, committable1);
+            assertToCompact(results.get(1), committable2, committable3);
+            assertToCompact(results.get(2), committable4, committable5);
+        }
+    }
+
+    @Test
+    public void testStateHandler() throws Exception {
+        FileCompactStrategy strategy = Builder.newBuilder().setSizeThreshold(10).build();
+        CompactCoordinator coordinator =
+                new CompactCoordinator(strategy, getTestCommittableSerializer());
+
+        // with . prefix
+        FileSinkCommittable committable0 = committable("0", ".0", 5);
+        FileSinkCommittable committable1 = committable("0", ".1", 6);
+
+        // without . prefix
+        FileSinkCommittable committable2 = committable("0", "2", 6);
+
+        OperatorSubtaskState state;
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>, CompactorRequest>
+                harness = new OneInputStreamOperatorTestHarness<>(coordinator)) {
+            harness.setup();
+            harness.open();
+
+            harness.processElement(message(committable0));
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(1);
+            state = harness.snapshot(1, 1);
+        }
+
+        CompactCoordinatorStateHandler handler =
+                new CompactCoordinatorStateHandler(getTestCommittableSerializer());
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<FileSinkCommittable>,
+                        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>>
+                harness = new OneInputStreamOperatorTestHarness<>(handler)) {
+            harness.setup(
+                    new EitherSerializer<>(
+                            new SimpleVersionedSerializerTypeSerializerProxy<>(
+                                    () ->
+                                            new CommittableMessageSerializer<>(
+                                                    getTestCommittableSerializer())),
+                            new SimpleVersionedSerializerTypeSerializerProxy<>(
+                                    () ->
+                                            new CompactorRequestSerializer(
+                                                    getTestCommittableSerializer()))));
+            harness.initializeState(state);
+            harness.open();
+
+            Assert.assertEquals(1, harness.extractOutputValues().size());
+
+            harness.processElement(message(committable1));
+            harness.processElement(message(committable2));
+
+            List<Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>> results =
+                    harness.extractOutputValues();
+            Assert.assertEquals(3, results.size());
+
+            // restored request
+            Assert.assertTrue(results.get(0).isRight());
+            assertToCompact(results.get(0).right(), committable0);
+
+            // committable with . prefix should also be passed through
+            Assert.assertTrue(
+                    results.get(1).isLeft()
+                            && results.get(1).left() instanceof CommittableWithLineage);
+            Assert.assertEquals(
+                    ((CommittableWithLineage<FileSinkCommittable>) results.get(1).left())
+                            .getCommittable(),
+                    committable1);
+
+            // committable without . prefix should be passed through normally
+            Assert.assertTrue(
+                    results.get(2).isLeft()
+                            && results.get(2).left() instanceof CommittableWithLineage);
+            Assert.assertEquals(
+                    ((CommittableWithLineage<FileSinkCommittable>) results.get(2).left())
+                            .getCommittable(),
+                    committable2);
+        }
+    }
+
+    private StreamRecord<CommittableMessage<FileSinkCommittable>> message(
+            FileSinkCommittable committable) {
+        return new StreamRecord<>(new CommittableWithLineage<>(committable, 1L, 0), 0L);
+    }
+
+    private FileSinkCommittable committable(String bucketId, String name, int size)
+            throws IOException {
+        // put bucketId after name to keep the possible '.' prefix in name
+        return new FileSinkCommittable(
+                bucketId,
+                new TestPendingFileRecoverable(
+                        newFile(name + "_" + bucketId, size <= 0 ? 1 : size), size));
+    }
+
+    private FileSinkCommittable cleanupInprogress(String bucketId, String name, int size)
+            throws IOException {
+        Path toCleanup = newFile(name + "_" + bucketId, size);
+        return new FileSinkCommittable(
+                bucketId, new TestInProgressFileRecoverable(toCleanup, size));
+    }
+
+    private SimpleVersionedSerializer<FileSinkCommittable> getTestCommittableSerializer() {
+        return new FileSinkCommittableSerializer(
+                new FileSinkTestUtils.SimpleVersionedWrapperSerializer<>(
+                        FileSinkTestUtils.TestPendingFileRecoverable::new),
+                new FileSinkTestUtils.SimpleVersionedWrapperSerializer<>(
+                        FileSinkTestUtils.TestInProgressFileRecoverable::new));
+    }
+
+    private void assertToCompact(CompactorRequest request, FileSinkCommittable... committables) {
+        List<FileSinkCommittable> committableToCompact = request.getCommittableToCompact();
+        Assert.assertArrayEquals(committables, committableToCompact.toArray());
+    }
+
+    private void assertToPassthrough(
+            CompactorRequest request, FileSinkCommittable... committables) {
+        List<FileSinkCommittable> committableToCompact = request.getCommittableToPassthrough();
+        Assert.assertArrayEquals(committables, committableToCompact.toArray());
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java
new file mode 100644
index 0000000..6b7f397
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor;
+
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.FileSinkCommittableSerializer;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler;
+import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest;
+import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
+import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils.TestInProgressFileRecoverable;
+import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils.TestPendingFileRecoverable;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Either;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Test for {@link CompactorOperator}. */
+public class CompactorOperatorTest extends AbstractCompactTestBase {
+
+    @Test
+    public void testCompact() throws Exception {
+        FileCompactor fileCompactor =
+                new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+        CompactorOperator compactor = createTestOperator(fileCompactor);
+
+        try (OneInputStreamOperatorTestHarness<
+                        CompactorRequest, CommittableMessage<FileSinkCommittable>>
+                harness = new OneInputStreamOperatorTestHarness<>(compactor)) {
+            harness.setup();
+            harness.open();
+
+            harness.processElement(
+                    request(
+                            "0",
+                            Arrays.asList(committable("0", ".0", 5), committable("0", ".1", 5)),
+                            null));
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(1);
+            harness.snapshot(1, 1L);
+            harness.notifyOfCompletedCheckpoint(1);
+
+            compactor.getAllTasksFuture().join();
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(2);
+
+            // 1summary+1compacted+2cleanup
+            List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+            Assert.assertEquals(4, results.size());
+            SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
+                    .hasPendingCommittables(3);
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1))
+                    .hasCommittable(committable("0", "compacted-0", 10));
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(2))
+                    .hasCommittable(cleanupPath("0", ".0"));
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(3))
+                    .hasCommittable(cleanupPath("0", ".1"));
+        }
+    }
+
+    @Test
+    public void testPassthrough() throws Exception {
+        FileCompactor fileCompactor =
+                new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+        CompactorOperator compactor = createTestOperator(fileCompactor);
+
+        try (OneInputStreamOperatorTestHarness<
+                        CompactorRequest, CommittableMessage<FileSinkCommittable>>
+                harness = new OneInputStreamOperatorTestHarness<>(compactor)) {
+            harness.setup();
+            harness.open();
+
+            FileSinkCommittable cleanupInprogressRequest = cleanupInprogress("0", "0", 1);
+            FileSinkCommittable cleanupPathRequest = cleanupPath("0", "1");
+
+            harness.processElement(
+                    request("0", null, Collections.singletonList(cleanupInprogressRequest)));
+            harness.processElement(
+                    request("0", null, Collections.singletonList(cleanupPathRequest)));
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(1);
+            harness.snapshot(1, 1L);
+            harness.notifyOfCompletedCheckpoint(1);
+
+            compactor.getAllTasksFuture().join();
+
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            harness.prepareSnapshotPreBarrier(2);
+
+            List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+            Assert.assertEquals(3, results.size());
+            SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
+                    .hasPendingCommittables(2);
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1))
+                    .hasCommittable(cleanupInprogressRequest);
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(2))
+                    .hasCommittable(cleanupPathRequest);
+        }
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        FileCompactor fileCompactor =
+                new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+        CompactorOperator compactor = createTestOperator(fileCompactor);
+
+        OperatorSubtaskState state;
+        try (OneInputStreamOperatorTestHarness<
+                        CompactorRequest, CommittableMessage<FileSinkCommittable>>
+                harness = new OneInputStreamOperatorTestHarness<>(compactor)) {
+            harness.setup();
+            harness.open();
+
+            harness.processElement(
+                    request(
+                            "0",
+                            Arrays.asList(committable("0", ".0", 5), committable("0", ".1", 5)),
+                            null));
+            harness.snapshot(1, 1L);
+
+            harness.processElement(
+                    request(
+                            "0",
+                            Arrays.asList(committable("0", ".2", 5), committable("0", ".3", 5)),
+                            null));
+
+            harness.notifyOfCompletedCheckpoint(1);
+
+            // request 1 is submitted and request 2 is pending
+            state = harness.snapshot(2, 2L);
+        }
+
+        compactor = createTestOperator(fileCompactor);
+        try (OneInputStreamOperatorTestHarness<
+                        CompactorRequest, CommittableMessage<FileSinkCommittable>>
+                harness = new OneInputStreamOperatorTestHarness<>(compactor)) {
+            harness.setup();
+            harness.initializeState(state);
+            harness.open();
+
+            // request 1 should be submitted
+            compactor.getAllTasksFuture().join();
+            harness.prepareSnapshotPreBarrier(3);
+
+            // the result of request 1 should be emitted
+            Assert.assertEquals(4, harness.extractOutputValues().size());
+
+            harness.snapshot(3, 3L);
+            harness.notifyOfCompletedCheckpoint(3L);
+
+            // request 2 should be submitted
+            compactor.getAllTasksFuture().join();
+            harness.prepareSnapshotPreBarrier(4);
+
+            // the result of request 2 should be emitted
+            Assert.assertEquals(8, harness.extractOutputValues().size());
+
+            // 1summary+1compacted+2cleanup * 2
+            List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+            Assert.assertEquals(8, results.size());
+            SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
+                    .hasPendingCommittables(3);
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1))
+                    .hasCommittable(committable("0", "compacted-0", 10));
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(2))
+                    .hasCommittable(cleanupPath("0", ".0"));
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(3))
+                    .hasCommittable(cleanupPath("0", ".1"));
+
+            SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(4))
+                    .hasPendingCommittables(3);
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(5))
+                    .hasCommittable(committable("0", "compacted-2", 10));
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(6))
+                    .hasCommittable(cleanupPath("0", ".2"));
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(7))
+                    .hasCommittable(cleanupPath("0", ".3"));
+        }
+    }
+
+    @Test
+    public void testStateHandler() throws Exception {
+        FileCompactor fileCompactor =
+                new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+        CompactorOperator compactor = createTestOperator(fileCompactor);
+
+        OperatorSubtaskState state;
+        try (OneInputStreamOperatorTestHarness<
+                        CompactorRequest, CommittableMessage<FileSinkCommittable>>
+                harness = new OneInputStreamOperatorTestHarness<>(compactor)) {
+            harness.setup();
+            harness.open();
+
+            harness.processElement(
+                    request(
+                            "0",
+                            Arrays.asList(committable("0", ".0", 1), committable("0", ".1", 2)),
+                            null));
+            harness.snapshot(1, 1L);
+
+            harness.processElement(
+                    request(
+                            "0",
+                            Arrays.asList(committable("0", ".2", 3), committable("0", ".3", 4)),
+                            null));
+
+            harness.notifyOfCompletedCheckpoint(1);
+
+            // request 1 is submitted and request 2 is pending
+            state = harness.snapshot(2, 2L);
+        }
+
+        CompactorOperatorStateHandler handler =
+                new CompactorOperatorStateHandler(
+                        getTestCommittableSerializer(), createTestBucketWriter());
+        try (OneInputStreamOperatorTestHarness<
+                        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+                        CommittableMessage<FileSinkCommittable>>
+                harness = new OneInputStreamOperatorTestHarness<>(handler)) {
+            harness.setup();
+            harness.initializeState(state);
+            harness.open();
+
+            // remaining requests from coordinator
+            harness.processElement(
+                    new StreamRecord<>(
+                            Either.Right(
+                                    request(
+                                                    "0",
+                                                    Collections.singletonList(
+                                                            committable("0", ".4", 5)),
+                                                    null)
+                                            .getValue())));
+            harness.processElement(
+                    new StreamRecord<>(
+                            Either.Right(
+                                    request(
+                                                    "0",
+                                                    Collections.singletonList(
+                                                            committable("0", ".5", 6)),
+                                                    null)
+                                            .getValue())));
+
+            harness.processElement(
+                    new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 3L, 2, 2, 0))));
+
+            // remaining in-progress file from file writer
+            harness.processElement(
+                    new StreamRecord<>(
+                            Either.Left(
+                                    new CommittableWithLineage<>(
+                                            committable("0", ".6", 7), 3L, 0))));
+
+            // new pending file written this time
+            harness.processElement(
+                    new StreamRecord<>(
+                            Either.Left(
+                                    new CommittableWithLineage<>(
+                                            committable("0", "7", 8), 3L, 0))));
+
+            Assert.assertTrue(handler.isWriterStateDrained());
+            Assert.assertFalse(handler.isStateDrained());
+
+            // the result should not be emitted yet, but all requests should already be submitted
+            Assert.assertEquals(0, harness.extractOutputValues().size());
+
+            compactor.getAllTasksFuture().join();
+            // state should be drained, and all results and holding messages should be emitted
+            harness.prepareSnapshotPreBarrier(3);
+
+            Assert.assertTrue(handler.isStateDrained());
+
+            // summary should be merged into one
+            // 1 summary+ 1 compacted + (1 compacted committable + 1 compacted cleanup) * 7
+            List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+            Assert.assertEquals(16, results.size());
+            SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
+                    .hasPendingCommittables(15);
+
+            List<FileSinkCommittable> expectedResult =
+                    Arrays.asList(
+                            committable("0", "7", 8),
+                            committable("0", "compacted-0", 1),
+                            cleanupPath("0", ".0"),
+                            committable("0", "compacted-1", 2),
+                            cleanupPath("0", ".1"),
+                            committable("0", "compacted-2", 3),
+                            cleanupPath("0", ".2"),
+                            committable("0", "compacted-3", 4),
+                            cleanupPath("0", ".3"),
+                            committable("0", "compacted-4", 5),
+                            cleanupPath("0", ".4"),
+                            committable("0", "compacted-5", 6),
+                            cleanupPath("0", ".5"),
+                            committable("0", "compacted-6", 7),
+                            cleanupPath("0", ".6"));
+
+            for (int i = 1; i < results.size(); ++i) {
+                SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(i))
+                        .hasCommittable(expectedResult.get(i - 1));
+            }
+        }
+    }
+
+    private StreamRecord<CompactorRequest> request(
+            String bucketId,
+            List<FileSinkCommittable> toCompact,
+            List<FileSinkCommittable> toPassthrough) {
+        return new StreamRecord<>(
+                new CompactorRequest(
+                        bucketId,
+                        toCompact == null ? new ArrayList<>() : toCompact,
+                        toPassthrough == null ? new ArrayList<>() : toPassthrough),
+                0L);
+    }
+
+    private FileSinkCommittable committable(String bucketId, String name, int size)
+            throws IOException {
+        // put bucketId after name to keep the possible '.' prefix in name
+        return new FileSinkCommittable(
+                bucketId,
+                new TestPendingFileRecoverable(
+                        newFile(name + "_" + bucketId, size <= 0 ? 1 : size), size));
+    }
+
+    private FileSinkCommittable cleanupInprogress(String bucketId, String name, int size)
+            throws IOException {
+        Path toCleanup = newFile(name + "_" + bucketId, size);
+        return new FileSinkCommittable(
+                bucketId, new TestInProgressFileRecoverable(toCleanup, size));
+    }
+
+    private FileSinkCommittable cleanupPath(String bucketId, String name) throws IOException {
+        Path toCleanup = newFile(name + "_" + bucketId, 1);
+        return new FileSinkCommittable(bucketId, toCleanup);
+    }
+
+    private SimpleVersionedSerializer<FileSinkCommittable> getTestCommittableSerializer() {
+        return new FileSinkCommittableSerializer(
+                new FileSinkTestUtils.SimpleVersionedWrapperSerializer<>(
+                        TestPendingFileRecoverable::new),
+                new FileSinkTestUtils.SimpleVersionedWrapperSerializer<>(
+                        TestInProgressFileRecoverable::new));
+    }
+
+    private CompactorOperator createTestOperator(FileCompactor compactor) {
+        return new CompactorOperator(
+                FileCompactStrategy.Builder.newBuilder()
+                        .setNumCompactThreads(2)
+                        .enableCompactionOnCheckpoint(1)
+                        .build(),
+                getTestCommittableSerializer(),
+                compactor,
+                createTestBucketWriter());
+    }
+
+    private BucketWriter<?, String> createTestBucketWriter() {
+        return new BucketWriter<Integer, String>() {
+
+            @Override
+            public InProgressFileWriter<Integer, String> openNewInProgressFile(
+                    String bucketId, Path path, long creationTime) throws IOException {
+                return new InProgressFileWriter<Integer, String>() {
+                    BufferedWriter writer;
+                    long size = 0L;
+
+                    @Override
+                    public void write(Integer element, long currentTime) throws IOException {
+                        if (writer == null) {
+                            writer = new BufferedWriter(new FileWriter(path.toString()));
+                        }
+                        writer.write(element);
+                        size += 1;
+                    }
+
+                    @Override
+                    public InProgressFileRecoverable persist() throws IOException {
+                        return new TestInProgressFileRecoverable(path, size);
+                    }
+
+                    @Override
+                    public PendingFileRecoverable closeForCommit() throws IOException {
+                        return new TestPendingFileRecoverable(path, size);
+                    }
+
+                    @Override
+                    public void dispose() {}
+
+                    @Override
+                    public String getBucketId() {
+                        return bucketId;
+                    }
+
+                    @Override
+                    public long getCreationTime() {
+                        return 0;
+                    }
+
+                    @Override
+                    public long getSize() throws IOException {
+                        return size;
+                    }
+
+                    @Override
+                    public long getLastUpdateTime() {
+                        return 0;
+                    }
+                };
+            }
+
+            @Override
+            public InProgressFileWriter<Integer, String> resumeInProgressFileFrom(
+                    String s, InProgressFileRecoverable inProgressFileSnapshot, long creationTime)
+                    throws IOException {
+                return null;
+            }
+
+            @Override
+            public WriterProperties getProperties() {
+                return null;
+            }
+
+            @Override
+            public PendingFile recoverPendingFile(PendingFileRecoverable pendingFileRecoverable)
+                    throws IOException {
+                return new PendingFile() {
+                    @Override
+                    public void commit() throws IOException {
+                        TestPendingFileRecoverable testRecoverable =
+                                (TestPendingFileRecoverable) pendingFileRecoverable;
+                        if (testRecoverable.getPath() != null) {
+                            if (!testRecoverable
+                                    .getPath()
+                                    .equals(testRecoverable.getUncommittedPath())) {
+                                testRecoverable
+                                        .getPath()
+                                        .getFileSystem()
+                                        .rename(
+                                                testRecoverable.getUncommittedPath(),
+                                                testRecoverable.getPath());
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void commitAfterRecovery() throws IOException {
+                        commit();
+                    }
+                };
+            }
+
+            @Override
+            public boolean cleanupInProgressFileRecoverable(
+                    InProgressFileRecoverable inProgressFileRecoverable) throws IOException {
+                return false;
+            }
+
+            @Override
+            public CompactingFileWriter openNewCompactingFile(
+                    CompactingFileWriter.Type type, String bucketId, Path path, long creationTime)
+                    throws IOException {
+                if (type == CompactingFileWriter.Type.RECORD_WISE) {
+                    return openNewInProgressFile(bucketId, path, creationTime);
+                } else {
+                    FileOutputStream fileOutputStream = new FileOutputStream(path.toString());
+                    return new OutputStreamBasedCompactingFileWriter() {
+
+                        @Override
+                        public OutputStream asOutputStream() throws IOException {
+                            return fileOutputStream;
+                        }
+
+                        @Override
+                        public PendingFileRecoverable closeForCommit() throws IOException {
+                            fileOutputStream.flush();
+                            return new TestPendingFileRecoverable(
+                                    path, fileOutputStream.getChannel().position());
+                        }
+                    };
+                }
+            }
+        };
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestTypeInfoTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestTypeInfoTest.java
new file mode 100644
index 0000000..c7427b8
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequestTypeInfoTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+import org.apache.flink.connector.file.sink.FileSinkCommittableSerializer;
+import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
+
+/** Test for {@link CompactorRequestTypeInfo}. */
+public class CompactorRequestTypeInfoTest
+        extends TypeInformationTestBase<CompactorRequestTypeInfo> {
+
+    @Override
+    protected CompactorRequestTypeInfo[] getTestData() {
+        return new CompactorRequestTypeInfo[] {
+            new CompactorRequestTypeInfo(
+                    () ->
+                            new FileSinkCommittableSerializer(
+                                    new FileSinkTestUtils.SimpleVersionedWrapperSerializer<>(
+                                            FileSinkTestUtils.TestPendingFileRecoverable::new),
+                                    new FileSinkTestUtils.SimpleVersionedWrapperSerializer<>(
+                                            FileSinkTestUtils.TestInProgressFileRecoverable::new)))
+        };
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
index b3d7041..b5b0c3e 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
@@ -36,31 +36,99 @@ public class FileSinkTestUtils {
     /** A type of testing {@link InProgressFileWriter.PendingFileRecoverable}. */
     public static class TestPendingFileRecoverable extends StringValue
             implements InProgressFileWriter.PendingFileRecoverable {
+        private Path path;
+        private Path uncommittedPath;
+        private long size;
+
+        public TestPendingFileRecoverable() {
+            this.path = null;
+            this.uncommittedPath = null;
+            this.size = -1L;
+        }
+
+        public TestPendingFileRecoverable(Path path, long size) {
+            this.path = path;
+            this.uncommittedPath = new Path(path.getParent(), "." + path.getName());
+            this.size = size;
+        }
+
         @Override
         public Path getPath() {
-            return null;
+            return path;
+        }
+
+        public Path getUncommittedPath() {
+            return uncommittedPath;
         }
 
         @Override
         public long getSize() {
-            return -1L;
+            return size;
+        }
+
+        @Override
+        public String getValue() {
+            return size + "," + (path == null ? "" : path.toUri().toString());
+        }
+
+        @Override
+        public void setValue(CharSequence value, int offset, int len) {
+            String[] arr = value.subSequence(offset, len).toString().split(",");
+            size = Integer.parseInt(arr[0]);
+            path = arr.length == 1 ? null : new Path(arr[1]);
+            if (path != null) {
+                uncommittedPath = new Path(path.getParent(), "." + path.getName());
+            }
         }
-        // Nope
     }
 
     /** A type of testing {@link InProgressFileWriter.InProgressFileRecoverable}. */
     public static class TestInProgressFileRecoverable extends StringValue
             implements InProgressFileWriter.InProgressFileRecoverable {
+        private Path path;
+        private Path uncommittedPath;
+        private long size;
+
+        public TestInProgressFileRecoverable() {
+            this.path = null;
+            this.uncommittedPath = null;
+            this.size = -1L;
+        }
+
+        public TestInProgressFileRecoverable(Path path, long size) {
+            this.path = path;
+            this.uncommittedPath = new Path(path.getParent(), "." + path.getName());
+            this.size = size;
+        }
+
         @Override
         public Path getPath() {
-            return null;
+            return path;
+        }
+
+        public Path getUncommittedPath() {
+            return uncommittedPath;
         }
 
         @Override
         public long getSize() {
-            return -1L;
+            return size;
+        }
+
+        @Override
+        public String getValue() {
+            return size + "," + (path == null ? "" : path.toUri().toString());
+        }
+
+        @Override
+        public void setValue(CharSequence value, int offset, int len) {
+            String[] arr = value.subSequence(offset, len).toString().split(",");
+            size = Integer.parseInt(arr[0]);
+            path = arr.length == 1 ? null : new Path(arr[1]);
+            if (path != null) {
+                uncommittedPath = new Path(path.getParent(), "." + path.getName());
+            }
         }
-        // Nope
     }
 
     /**
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java
index 94d4736..c00cca1 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.sink.utils;
 
 import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader.Decoder;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
@@ -28,6 +29,7 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -56,6 +58,30 @@ public class IntegerFileSinkTestDataUtils {
         }
     }
 
+    /** Testing sink {@link Decoder} that reads integer for compaction. */
+    public static class IntDecoder implements Decoder<Integer> {
+
+        private InputStream input;
+
+        @Override
+        public void open(InputStream input) throws IOException {
+            this.input = input;
+        }
+
+        @Override
+        public Integer decodeNext() throws IOException {
+            byte[] bytes = new byte[4];
+            int read = input.read(bytes);
+            return read < 0 ? null : ByteBuffer.wrap(bytes).getInt();
+        }
+
+        @Override
+        public void close() throws IOException {
+            input.close();
+            input = null;
+        }
+    }
+
     /** Testing {@link BucketAssigner} that assigns integers according to modulo. */
     public static class ModuloBucketAssigner implements BucketAssigner<Integer, String> {
 
diff --git a/flink-core/src/main/java/org/apache/flink/util/CloseShieldOutputStream.java b/flink-core/src/main/java/org/apache/flink/util/CloseShieldOutputStream.java
new file mode 100644
index 0000000..b0ef6bb
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/CloseShieldOutputStream.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/** A proxy output stream that prevents the underlying output stream from being closed. */
+public class CloseShieldOutputStream extends OutputStream {
+    private final OutputStream out;
+
+    public CloseShieldOutputStream(OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] buffer) throws IOException {
+        out.write(buffer);
+    }
+
+    @Override
+    public void write(byte[] buffer, int off, int len) throws IOException {
+        out.write(buffer, off, len);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Do not actually close the internal stream.
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java
new file mode 100644
index 0000000..ed17baa
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * A serializable {@link SupplierWithException}.
+ *
+ * @param <T> the type of results supplied by this supplier
+ * @param <E> the type of Exceptions thrown by this function.
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface SerializableSupplierWithException<T, E extends Throwable>
+        extends SupplierWithException<T, E>, Serializable {}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroWriterFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroWriterFactory.java
index 473c68e..9475c13 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroWriterFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroWriterFactory.java
@@ -20,11 +20,11 @@ package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.util.CloseShieldOutputStream;
 
 import org.apache.avro.file.DataFileWriter;
 
 import java.io.IOException;
-import java.io.OutputStream;
 
 /**
  * A factory that creates an {@link AvroBulkWriter}. The factory takes a user-supplied builder to
@@ -47,39 +47,4 @@ public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
     public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
         return new AvroBulkWriter<>(avroBuilder.createWriter(new CloseShieldOutputStream(out)));
     }
-
-    /** Proxy output stream that prevents the underlying output stream from being closed. */
-    private static class CloseShieldOutputStream extends OutputStream {
-        private final OutputStream out;
-
-        public CloseShieldOutputStream(OutputStream out) {
-            this.out = out;
-        }
-
-        @Override
-        public void write(int b) throws IOException {
-            out.write(b);
-        }
-
-        @Override
-        public void write(byte[] buffer) throws IOException {
-            out.write(buffer);
-        }
-
-        @Override
-        public void write(byte[] buffer, int off, int len) throws IOException {
-            out.write(buffer, off, len);
-        }
-
-        @Override
-        public void flush() throws IOException {
-            out.flush();
-        }
-
-        @Override
-        public void close() throws IOException {
-            // we do not actually close the internal stream here to prevent that the finishing
-            // of the Avro Writer closes the target output stream.
-        }
-    }
 }
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index f5a83ae..1acd328 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -53,6 +53,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-files</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>

[flink] 02/06: [FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51adf0fcbdc63520a58246100b5a585b9f563ba6
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jan 25 18:23:27 2022 +0800

    [FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter.
---
 .../7602816f-5c01-4b7a-9e3e-235dfedec245           |  1 -
 .../functions/sink/filesystem/BucketWriter.java    | 26 ++++++++++++
 .../functions/sink/filesystem/BulkPartWriter.java  |  1 +
 ...ssFileWriter.java => CompactingFileWriter.java} | 47 +++++++++-------------
 .../sink/filesystem/InProgressFileWriter.java      | 10 ++++-
 ... => OutputStreamBasedCompactingFileWriter.java} | 35 ++++++----------
 .../OutputStreamBasedPartFileWriter.java           | 36 ++++++++++++++++-
 ...er.java => RecordWiseCompactingFileWriter.java} | 33 +++++----------
 .../sink/filesystem/RowWisePartWriter.java         |  8 ++--
 9 files changed, 117 insertions(+), 80 deletions(-)

diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
index 4bfb283..f61d7d9 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
+++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
@@ -225,7 +225,6 @@ org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter
 org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter$PendingFile does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
-org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedInProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
index 88ad598..00c7890 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
@@ -40,6 +40,32 @@ public interface BucketWriter<IN, BucketID> {
             final BucketID bucketID, final Path path, final long creationTime) throws IOException;
 
     /**
+     * Used to create a new {@link CompactingFileWriter} of the requesting type. Requesting a writer
+     * of an unsupported type will result in UnsupportedOperationException. By default, only
+     * RECORD_WISE type is supported, for which a {@link InProgressFileWriter} will be created.
+     *
+     * @param type the type of this writer.
+     * @param bucketID the id of the bucket this writer is writing to.
+     * @param path the path this writer will write to.
+     * @param creationTime the creation time of the file.
+     * @return the new {@link InProgressFileWriter}
+     * @throws IOException Thrown if creating a writer fails.
+     * @throws UnsupportedOperationException Thrown if the bucket writer doesn't support the
+     *     requesting type.
+     */
+    default CompactingFileWriter openNewCompactingFile(
+            final CompactingFileWriter.Type type,
+            final BucketID bucketID,
+            final Path path,
+            final long creationTime)
+            throws IOException {
+        if (type == CompactingFileWriter.Type.RECORD_WISE) {
+            return openNewInProgressFile(bucketID, path, creationTime);
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    /**
      * Used to resume a {@link InProgressFileWriter} from a {@link
      * InProgressFileWriter.InProgressFileRecoverable}.
      *
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index b5a12de..758296d 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -45,6 +45,7 @@ final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter
 
     @Override
     public void write(IN element, long currentTime) throws IOException {
+        ensureWriteType(Type.RECORD_WISE);
         writer.addElement(element);
         markWrite(currentTime);
     }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
similarity index 51%
copy from flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
copy to flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
index c316254..23033aa 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
@@ -18,44 +18,35 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
 
 import java.io.IOException;
 
-/** The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file. */
-@Internal
-public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
+/**
+ * The file sink compactors use the {@link CompactingFileWriter} to write a compacting file.
+ *
+ * <p>A class should not directly implement the {@link CompactingFileWriter}, but to implement the
+ * {@link RecordWiseCompactingFileWriter}, or the {@link OutputStreamBasedCompactingFileWriter}, or
+ * both. If an class implements both interfaces, once the write method of either interface is
+ * called, the write method in the other one should be disabled.
+ */
+@PublicEvolving
+public interface CompactingFileWriter {
 
     /**
-     * Write an element to the part file.
+     * Closes the writer and gets the {@link PendingFileRecoverable} of the written compacting file.
      *
-     * @param element the element to be written.
-     * @param currentTime the writing time.
-     * @throws IOException Thrown if writing the element fails.
-     */
-    void write(final IN element, final long currentTime) throws IOException;
-
-    /**
-     * @return The state of the current part file.
-     * @throws IOException Thrown if persisting the part file fails.
-     */
-    InProgressFileRecoverable persist() throws IOException;
-
-    /**
      * @return The state of the pending part file. {@link Bucket} uses this to commit the pending
      *     file.
      * @throws IOException Thrown if an I/O error occurs.
      */
     PendingFileRecoverable closeForCommit() throws IOException;
 
-    /** Dispose the part file. */
-    void dispose();
-
-    // ------------------------------------------------------------------------
-
-    /** A handle can be used to recover in-progress file.. */
-    interface InProgressFileRecoverable extends PendingFileRecoverable {}
-
-    /** The handle can be used to recover pending file. */
-    interface PendingFileRecoverable {}
+    /** Enum defining the types of {@link CompactingFileWriter}. */
+    @PublicEvolving
+    enum Type {
+        RECORD_WISE,
+        OUTPUT_STREAM
+    }
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
index c316254..f633023 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -19,12 +19,14 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.IOException;
 
 /** The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file. */
 @Internal
-public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
+public interface InProgressFileWriter<IN, BucketID>
+        extends PartFileInfo<BucketID>, RecordWiseCompactingFileWriter<IN> {
 
     /**
      * Write an element to the part file.
@@ -51,11 +53,17 @@ public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketI
     /** Dispose the part file. */
     void dispose();
 
+    @Override
+    default void write(IN element) throws IOException {
+        write(element, System.currentTimeMillis());
+    }
+
     // ------------------------------------------------------------------------
 
     /** A handle can be used to recover in-progress file.. */
     interface InProgressFileRecoverable extends PendingFileRecoverable {}
 
     /** The handle can be used to recover pending file. */
+    @PublicEvolving
     interface PendingFileRecoverable {}
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
similarity index 50%
copy from flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
copy to flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
index aeee8ee..82970595 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
@@ -19,33 +19,22 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
 
 /**
- * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}. This also
- * implements the {@link PartFileInfo}.
+ * The compactors use the {@link OutputStreamBasedCompactingFileWriter} to directly write a
+ * compacting file as an {@link OutputStream}.
  */
 @Internal
-final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
-
-    private final Encoder<IN> encoder;
-
-    RowWisePartWriter(
-            final BucketID bucketId,
-            final RecoverableFsDataOutputStream currentPartStream,
-            final Encoder<IN> encoder,
-            final long creationTime) {
-        super(bucketId, currentPartStream, creationTime);
-        this.encoder = Preconditions.checkNotNull(encoder);
-    }
-
-    @Override
-    public void write(final IN element, final long currentTime) throws IOException {
-        encoder.encode(element, currentPartStream);
-        markWrite(currentTime);
-    }
+public interface OutputStreamBasedCompactingFileWriter extends CompactingFileWriter {
+    /**
+     * Gets the output stream underlying the writer. The close method of the returned stream should
+     * never be called.
+     *
+     * @return The output stream to write the compacting file.
+     * @throws IOException Thrown if acquiring the stream fails.
+     */
+    OutputStream asOutputStream() throws IOException;
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
index 666f82b..13ceae5 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
@@ -30,6 +30,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.IOUtils;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
 
 /**
  * The base class for all the part file writer that use {@link
@@ -39,10 +41,13 @@ import java.io.IOException;
  * @param <BucketID> the bucket type
  */
 public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
-        extends AbstractPartFileWriter<IN, BucketID> {
+        extends AbstractPartFileWriter<IN, BucketID>
+        implements OutputStreamBasedCompactingFileWriter {
 
     final RecoverableFsDataOutputStream currentPartStream;
 
+    private CompactingFileWriter.Type writeType = null;
+
     OutputStreamBasedPartFileWriter(
             final BucketID bucketID,
             final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
@@ -74,6 +79,27 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
         return currentPartStream.getPos();
     }
 
+    @Override
+    public OutputStream asOutputStream() throws IOException {
+        ensureWriteType(Type.OUTPUT_STREAM);
+        return currentPartStream;
+    }
+
+    protected void ensureWriteType(Type type) {
+        if (type != this.writeType) {
+            if (this.writeType == null) {
+                this.writeType = type;
+            } else {
+                throw new IllegalStateException(
+                        "Writer has already been opened as "
+                                + writeType
+                                + " type, but trying to reopen it as "
+                                + type
+                                + " type.");
+            }
+        }
+    }
+
     abstract static class OutputStreamBasedBucketWriter<IN, BucketID>
             implements BucketWriter<IN, BucketID> {
 
@@ -91,6 +117,14 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
         }
 
         @Override
+        public CompactingFileWriter openNewCompactingFile(
+                CompactingFileWriter.Type type, BucketID bucketID, Path path, long creationTime)
+                throws IOException {
+            // Both types are supported, overwrite to avoid UnsupportedOperationException.
+            return openNewInProgressFile(bucketID, path, creationTime);
+        }
+
+        @Override
         public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
                 final BucketID bucketID,
                 final InProgressFileRecoverable inProgressFileRecoverable,
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RecordWiseCompactingFileWriter.java
similarity index 50%
copy from flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
copy to flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RecordWiseCompactingFileWriter.java
index aeee8ee..01b48ed 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RecordWiseCompactingFileWriter.java
@@ -19,33 +19,20 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 
 /**
- * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}. This also
- * implements the {@link PartFileInfo}.
+ * The compactors use the {@link RecordWiseCompactingFileWriter} to write elements to a compacting
+ * file.
  */
 @Internal
-final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
-
-    private final Encoder<IN> encoder;
-
-    RowWisePartWriter(
-            final BucketID bucketId,
-            final RecoverableFsDataOutputStream currentPartStream,
-            final Encoder<IN> encoder,
-            final long creationTime) {
-        super(bucketId, currentPartStream, creationTime);
-        this.encoder = Preconditions.checkNotNull(encoder);
-    }
-
-    @Override
-    public void write(final IN element, final long currentTime) throws IOException {
-        encoder.encode(element, currentPartStream);
-        markWrite(currentTime);
-    }
+public interface RecordWiseCompactingFileWriter<IN> extends CompactingFileWriter {
+    /**
+     * Write an element to the compacting file.
+     *
+     * @param element the element to be written.
+     * @throws IOException Thrown if writing the element fails.
+     */
+    void write(IN element) throws IOException;
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index aeee8ee..f2b473c 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -27,14 +27,15 @@ import java.io.IOException;
 
 /**
  * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}. This also
- * implements the {@link PartFileInfo}.
+ * implements the {@link PartFileInfo} and the {@link OutputStreamBasedCompactingFileWriter}.
  */
 @Internal
-final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
+public final class RowWisePartWriter<IN, BucketID>
+        extends OutputStreamBasedPartFileWriter<IN, BucketID> {
 
     private final Encoder<IN> encoder;
 
-    RowWisePartWriter(
+    public RowWisePartWriter(
             final BucketID bucketId,
             final RecoverableFsDataOutputStream currentPartStream,
             final Encoder<IN> encoder,
@@ -45,6 +46,7 @@ final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWri
 
     @Override
     public void write(final IN element, final long currentTime) throws IOException {
+        ensureWriteType(Type.RECORD_WISE);
         encoder.encode(element, currentPartStream);
         markWrite(currentTime);
     }

[flink] 06/06: [FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aec2d38710a67d90bd819bfdce66b5a5a646a882
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 16 18:38:48 2022 +0800

    [FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink.
    
    This closes #18680.
---
 .../file/sink/BatchCompactingFileSinkITCase.java   |  69 ++++
 .../file/sink/FileSinkCompactionSwitchITCase.java  | 391 +++++++++++++++++++++
 .../flink/connector/file/sink/FileSinkITBase.java  |   4 +-
 .../sink/StreamingCompactingFileSinkITCase.java    |  69 ++++
 4 files changed, 532 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java
new file mode 100644
index 0000000..5167e97
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Tests the compaction of the {@link FileSink} in BATCH mode. */
+@RunWith(Parameterized.class)
+public class BatchCompactingFileSinkITCase extends BatchExecutionFileSinkITCase {
+
+    private static final int PARALLELISM = 4;
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
+    @Override
+    protected FileSink<Integer> createFileSink(String path) {
+        return FileSink.forRowFormat(new Path(path), new IntegerFileSinkTestDataUtils.IntEncoder())
+                .withBucketAssigner(
+                        new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
+                .withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024))
+                .enableCompact(createFileCompactStrategy(), createFileCompactor())
+                .build();
+    }
+
+    private static FileCompactor createFileCompactor() {
+        return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+    }
+
+    private static FileCompactStrategy createFileCompactStrategy() {
+        return FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10000).build();
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java
new file mode 100644
index 0000000..4a7c0f0
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.connector.file.sink.FileSink.DefaultRowFormatBuilder;
+import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntEncoder;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.ModuloBucketAssigner;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests of switching on or off compaction for the {@link FileSink}. */
+@RunWith(Parameterized.class)
+public class FileSinkCompactionSwitchITCase {
+
+    private static final int PARALLELISM = 4;
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
+    protected static final int NUM_SOURCES = 4;
+
+    protected static final int NUM_SINKS = 3;
+
+    protected static final int NUM_RECORDS = 10000;
+
+    protected static final int NUM_BUCKETS = 4;
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    private static final Map<String, CountDownLatch> LATCH_MAP = new ConcurrentHashMap<>();
+
+    private String latchId;
+
+    @Parameterized.Parameter public boolean isOnToOff;
+
+    @Parameterized.Parameters(name = "isOnToOff = {0}")
+    public static Collection<Object[]> params() {
+        return Arrays.asList(new Object[] {false}, new Object[] {true});
+    }
+
+    @Before
+    public void setup() {
+        this.latchId = UUID.randomUUID().toString();
+        // Wait for 3 checkpoints to ensure that the coordinator and all compactors have state
+        LATCH_MAP.put(latchId, new CountDownLatch(NUM_SOURCES * 3));
+    }
+
+    @After
+    public void teardown() {
+        LATCH_MAP.remove(latchId);
+    }
+
+    @Test
+    public void testSwitchingCompaction() throws Exception {
+        String path = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap =
+                sharedObjects.add(new ConcurrentHashMap<>());
+        JobGraph jobGraph = createJobGraph(path, isOnToOff, false, sendCountMap);
+        JobGraph restoringJobGraph = createJobGraph(path, !isOnToOff, true, sendCountMap);
+
+        final Configuration config = new Configuration();
+        config.setString(RestOptions.BIND_PORT, "18081-19000");
+        final MiniClusterConfiguration cfg =
+                new MiniClusterConfiguration.Builder()
+                        .setNumTaskManagers(1)
+                        .setNumSlotsPerTaskManager(4)
+                        .setConfiguration(config)
+                        .build();
+
+        try (MiniCluster miniCluster = new MiniCluster(cfg)) {
+            miniCluster.start();
+            miniCluster.submitJob(jobGraph);
+
+            LATCH_MAP.get(latchId).await();
+
+            String savepointPath =
+                    miniCluster
+                            .triggerSavepoint(
+                                    jobGraph.getJobID(),
+                                    TEMPORARY_FOLDER.newFolder().getAbsolutePath(),
+                                    true,
+                                    SavepointFormatType.CANONICAL)
+                            .get();
+
+            // We wait for two successful checkpoints in sources before shutting down. This ensures
+            // that the sink can commit its data.
+            LATCH_MAP.put(latchId, new CountDownLatch(NUM_SOURCES * 2));
+
+            restoringJobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(savepointPath, false));
+            miniCluster.executeJobBlocking(restoringJobGraph);
+        }
+
+        checkIntegerSequenceSinkOutput(path, sendCountMap.get(), NUM_BUCKETS, NUM_SOURCES);
+    }
+
+    private JobGraph createJobGraph(
+            String path,
+            boolean compactionEnabled,
+            boolean isFinite,
+            SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration config = new Configuration();
+        config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+        env.configure(config, getClass().getClassLoader());
+
+        env.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        env.addSource(new CountingTestSource(latchId, NUM_RECORDS, isFinite, sendCountMap))
+                .setParallelism(NUM_SOURCES)
+                .sinkTo(createFileSink(path, compactionEnabled))
+                .uid("sink")
+                .setParallelism(NUM_SINKS);
+
+        StreamGraph streamGraph = env.getStreamGraph();
+        return streamGraph.getJobGraph();
+    }
+
+    private FileSink<Integer> createFileSink(String path, boolean compactionEnabled) {
+        DefaultRowFormatBuilder<Integer> sinkBuilder =
+                FileSink.forRowFormat(new Path(path), new IntEncoder())
+                        .withBucketAssigner(new ModuloBucketAssigner(NUM_BUCKETS))
+                        .withRollingPolicy(
+                                new FileSinkITBase.PartSizeAndCheckpointRollingPolicy(1024));
+
+        if (compactionEnabled) {
+            sinkBuilder =
+                    sinkBuilder.enableCompact(createFileCompactStrategy(), createFileCompactor());
+        }
+
+        return sinkBuilder.build();
+    }
+
+    private static FileCompactor createFileCompactor() {
+        return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+    }
+
+    private static FileCompactStrategy createFileCompactStrategy() {
+        return FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(2).build();
+    }
+
+    private static void checkIntegerSequenceSinkOutput(
+            String path, Map<Integer, Integer> countMap, int numBuckets, int numSources)
+            throws Exception {
+        assertEquals(numSources, countMap.size());
+
+        File dir = new File(path);
+        String[] subDirNames = dir.list();
+        assertNotNull(subDirNames);
+
+        Arrays.sort(subDirNames, Comparator.comparingInt(Integer::parseInt));
+        assertEquals(numBuckets, subDirNames.length);
+        for (int i = 0; i < numBuckets; ++i) {
+            assertEquals(Integer.toString(i), subDirNames[i]);
+
+            // now check its content
+            File bucketDir = new File(path, subDirNames[i]);
+            assertTrue(
+                    bucketDir.getAbsolutePath() + " Should be a existing directory",
+                    bucketDir.isDirectory());
+
+            Map<Integer, Integer> counts = new HashMap<>();
+            File[] files = bucketDir.listFiles(f -> !f.getName().startsWith("."));
+            assertNotNull(files);
+
+            for (File file : files) {
+                assertTrue(file.isFile());
+
+                try (DataInputStream dataInputStream =
+                        new DataInputStream(new FileInputStream(file))) {
+                    while (true) {
+                        int value = dataInputStream.readInt();
+                        counts.compute(value, (k, v) -> v == null ? 1 : v + 1);
+                    }
+                } catch (EOFException e) {
+                    // End the reading
+                }
+            }
+
+            int bucketId = i;
+            int expectedCount =
+                    countMap.values().stream()
+                            .map(
+                                    numRecords ->
+                                            numRecords / numBuckets
+                                                    + (bucketId < numRecords % numBuckets ? 1 : 0))
+                            .mapToInt(num -> num)
+                            .max()
+                            .getAsInt();
+            assertEquals(expectedCount, counts.size());
+
+            List<Integer> countList = new ArrayList<>(countMap.values());
+            Collections.sort(countList);
+            for (int j = 0; j < countList.size(); j++) {
+                int rangeFrom = j == 0 ? 0 : countList.get(j - 1);
+                rangeFrom =
+                        bucketId
+                                + (rangeFrom % numBuckets == 0
+                                        ? rangeFrom
+                                        : (rangeFrom + numBuckets - rangeFrom % numBuckets));
+                int rangeTo = countList.get(j);
+                for (int k = rangeFrom; k < rangeTo; k += numBuckets) {
+                    assertEquals(
+                            "The record "
+                                    + k
+                                    + " should occur "
+                                    + (numBuckets - j)
+                                    + " times, "
+                                    + " but only occurs "
+                                    + counts.getOrDefault(k, 0)
+                                    + "time",
+                            numBuckets - j,
+                            counts.getOrDefault(k, 0).intValue());
+                }
+            }
+        }
+    }
+
+    private static class CountingTestSource extends RichParallelSourceFunction<Integer>
+            implements CheckpointListener, CheckpointedFunction {
+
+        private final String latchId;
+
+        private final int numberOfRecords;
+
+        private final boolean isFinite;
+
+        private final SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap;
+
+        private ListState<Integer> nextValueState;
+
+        private int nextValue;
+
+        private volatile boolean isCanceled;
+
+        private volatile boolean snapshottedAfterAllRecordsOutput;
+
+        private volatile boolean isWaitingCheckpointComplete;
+
+        public CountingTestSource(
+                String latchId,
+                int numberOfRecords,
+                boolean isFinite,
+                SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap) {
+            this.latchId = latchId;
+            this.numberOfRecords = numberOfRecords;
+            this.isFinite = isFinite;
+            this.sendCountMap = sendCountMap;
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            nextValueState =
+                    context.getOperatorStateStore()
+                            .getListState(new ListStateDescriptor<>("nextValue", Integer.class));
+
+            if (nextValueState.get() != null && nextValueState.get().iterator().hasNext()) {
+                nextValue = nextValueState.get().iterator().next();
+            }
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            // If we are not going to trigger failover or we have already triggered failover,
+            // run until finished.
+            sendRecordsUntil(isFinite ? (nextValue + numberOfRecords) : Integer.MAX_VALUE, ctx);
+
+            // Wait the last checkpoint to commit all the pending records.
+            isWaitingCheckpointComplete = true;
+            CountDownLatch latch = LATCH_MAP.get(latchId);
+            latch.await();
+        }
+
+        private void sendRecordsUntil(int targetNumber, SourceContext<Integer> ctx) {
+            while (!isCanceled && nextValue < targetNumber) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(nextValue++);
+                }
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            nextValueState.update(Collections.singletonList(nextValue));
+            sendCountMap.consumeSync(
+                    m -> m.put(getRuntimeContext().getIndexOfThisSubtask(), nextValue));
+
+            if (isWaitingCheckpointComplete) {
+                snapshottedAfterAllRecordsOutput = true;
+            }
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            if (!isFinite || (isWaitingCheckpointComplete && snapshottedAfterAllRecordsOutput)) {
+                CountDownLatch latch = LATCH_MAP.get(latchId);
+                latch.countDown();
+            }
+        }
+
+        @Override
+        public void cancel() {
+            isCanceled = true;
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
index 6c424ec..50f8107 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
 import org.apache.flink.util.TestLogger;
 
@@ -94,7 +95,8 @@ public abstract class FileSinkITBase extends TestLogger {
                 .build();
     }
 
-    private static class PartSizeAndCheckpointRollingPolicy
+    /** The testing {@link RollingPolicy} based on maximum file size. */
+    protected static class PartSizeAndCheckpointRollingPolicy
             extends CheckpointRollingPolicy<Integer, String> {
 
         private final long maxPartSize;
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
new file mode 100644
index 0000000..227d49a
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Tests the compaction of the {@link FileSink} in STREAMING mode. */
+@RunWith(Parameterized.class)
+public class StreamingCompactingFileSinkITCase extends StreamingExecutionFileSinkITCase {
+
+    private static final int PARALLELISM = 4;
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
+    @Override
+    protected FileSink<Integer> createFileSink(String path) {
+        return FileSink.forRowFormat(new Path(path), new IntegerFileSinkTestDataUtils.IntEncoder())
+                .withBucketAssigner(
+                        new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
+                .withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024))
+                .enableCompact(createFileCompactStrategy(), createFileCompactor())
+                .build();
+    }
+
+    private static FileCompactor createFileCompactor() {
+        return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+    }
+
+    private static FileCompactStrategy createFileCompactStrategy() {
+        return FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10000).build();
+    }
+}

[flink] 03/06: [FLINK-25583][connectors/filesystem] Add the getPath and getSize methods in PendingFileRecoverable.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2ea16c4aace21b72faf57a1760abf9d65035f3a
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Jan 26 15:27:53 2022 +0800

    [FLINK-25583][connectors/filesystem] Add the getPath and getSize methods in PendingFileRecoverable.
---
 .../file/sink/utils/FileSinkTestUtils.java         |  19 +++
 .../sink/filesystem/BulkBucketWriter.java          |   5 +-
 .../functions/sink/filesystem/BulkPartWriter.java  |   4 +-
 .../sink/filesystem/InProgressFileWriter.java      |  13 +-
 .../OutputStreamBasedPartFileWriter.java           | 162 +++++++++++++++++++--
 .../sink/filesystem/RowWiseBucketWriter.java       |   5 +-
 .../sink/filesystem/RowWisePartWriter.java         |   4 +-
 .../hadoop/bulk/HadoopPathBasedPartFileWriter.java |  64 +++++++-
 8 files changed, 254 insertions(+), 22 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
index 908aa42..b3d7041 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.sink.utils;
 
 import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
@@ -35,12 +36,30 @@ public class FileSinkTestUtils {
     /** A type of testing {@link InProgressFileWriter.PendingFileRecoverable}. */
     public static class TestPendingFileRecoverable extends StringValue
             implements InProgressFileWriter.PendingFileRecoverable {
+        @Override
+        public Path getPath() {
+            return null;
+        }
+
+        @Override
+        public long getSize() {
+            return -1L;
+        }
         // Nope
     }
 
     /** A type of testing {@link InProgressFileWriter.InProgressFileRecoverable}. */
     public static class TestInProgressFileRecoverable extends StringValue
             implements InProgressFileWriter.InProgressFileRecoverable {
+        @Override
+        public Path getPath() {
+            return null;
+        }
+
+        @Override
+        public long getSize() {
+            return -1L;
+        }
         // Nope
     }
 
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
index 7906243..0c4ee74 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
@@ -50,6 +50,7 @@ public class BulkBucketWriter<IN, BucketID>
     public InProgressFileWriter<IN, BucketID> resumeFrom(
             final BucketID bucketId,
             final RecoverableFsDataOutputStream stream,
+            final Path path,
             final RecoverableWriter.ResumeRecoverable resumable,
             final long creationTime)
             throws IOException {
@@ -58,7 +59,7 @@ public class BulkBucketWriter<IN, BucketID>
         Preconditions.checkNotNull(resumable);
 
         final BulkWriter<IN> writer = writerFactory.create(stream);
-        return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+        return new BulkPartWriter<>(bucketId, path, stream, writer, creationTime);
     }
 
     @Override
@@ -73,6 +74,6 @@ public class BulkBucketWriter<IN, BucketID>
         Preconditions.checkNotNull(path);
 
         final BulkWriter<IN> writer = writerFactory.create(stream);
-        return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+        return new BulkPartWriter<>(bucketId, path, stream, writer, creationTime);
     }
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 758296d..d770c69 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.util.Preconditions;
 
@@ -36,10 +37,11 @@ final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter
 
     BulkPartWriter(
             final BucketID bucketId,
+            final Path path,
             final RecoverableFsDataOutputStream currentPartStream,
             final BulkWriter<IN> writer,
             final long creationTime) {
-        super(bucketId, currentPartStream, creationTime);
+        super(bucketId, path, currentPartStream, creationTime);
         this.writer = Preconditions.checkNotNull(writer);
     }
 
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
index f633023..dbc8159 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 
@@ -65,5 +68,13 @@ public interface InProgressFileWriter<IN, BucketID>
 
     /** The handle can be used to recover pending file. */
     @PublicEvolving
-    interface PendingFileRecoverable {}
+    interface PendingFileRecoverable {
+
+        /** @return The target path of the pending file, null if unavailable. */
+        @Nullable
+        Path getPath();
+
+        /** @return The size of the pending file, -1 if unavailable. */
+        long getSize();
+    }
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
index 13ceae5..06d3bf9 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
@@ -29,6 +29,8 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.IOUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Objects;
@@ -46,25 +48,31 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
 
     final RecoverableFsDataOutputStream currentPartStream;
 
+    @Nullable final Path targetPath;
+
     private CompactingFileWriter.Type writeType = null;
 
     OutputStreamBasedPartFileWriter(
             final BucketID bucketID,
+            @Nullable final Path path,
             final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
             final long createTime) {
         super(bucketID, createTime);
+        this.targetPath = path;
         this.currentPartStream = recoverableFsDataOutputStream;
     }
 
     @Override
     public InProgressFileRecoverable persist() throws IOException {
-        return new OutputStreamBasedInProgressFileRecoverable(currentPartStream.persist());
+        return new OutputStreamBasedInProgressFileRecoverable(
+                currentPartStream.persist(), targetPath);
     }
 
     @Override
     public PendingFileRecoverable closeForCommit() throws IOException {
+        long size = currentPartStream.getPos();
         return new OutputStreamBasedPendingFileRecoverable(
-                currentPartStream.closeForCommit().getRecoverable());
+                currentPartStream.closeForCommit().getRecoverable(), targetPath, size);
     }
 
     @Override
@@ -137,6 +145,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
                     bucketID,
                     recoverableWriter.recover(
                             outputStreamBasedInProgressRecoverable.getResumeRecoverable()),
+                    inProgressFileRecoverable.getPath(),
                     outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
                     creationTime);
         }
@@ -192,6 +201,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
         public abstract InProgressFileWriter<IN, BucketID> resumeFrom(
                 final BucketID bucketId,
                 final RecoverableFsDataOutputStream stream,
+                final Path path,
                 final RecoverableWriter.ResumeRecoverable resumable,
                 final long creationTime)
                 throws IOException;
@@ -205,14 +215,60 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
 
         private final RecoverableWriter.CommitRecoverable commitRecoverable;
 
+        @Nullable private final Path targetPath;
+        private final long fileSize;
+
+        @Deprecated
+        // Remained for state compatibility
         public OutputStreamBasedPendingFileRecoverable(
                 final RecoverableWriter.CommitRecoverable commitRecoverable) {
+            this(commitRecoverable, null, -1L);
+        }
+
+        public OutputStreamBasedPendingFileRecoverable(
+                final RecoverableWriter.CommitRecoverable commitRecoverable,
+                @Nullable final Path targetPath,
+                final long fileSize) {
             this.commitRecoverable = commitRecoverable;
+            this.targetPath = targetPath;
+            this.fileSize = fileSize;
         }
 
         RecoverableWriter.CommitRecoverable getCommitRecoverable() {
             return commitRecoverable;
         }
+
+        @Override
+        public Path getPath() {
+            return targetPath;
+        }
+
+        @Override
+        public long getSize() {
+            return fileSize;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            OutputStreamBasedPendingFileRecoverable that =
+                    (OutputStreamBasedPendingFileRecoverable) o;
+            return fileSize == that.fileSize
+                    && Objects.equals(commitRecoverable, that.commitRecoverable)
+                    && Objects.equals(targetPath, that.targetPath);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(commitRecoverable, targetPath, fileSize);
+        }
     }
 
     /**
@@ -223,15 +279,57 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
             implements InProgressFileRecoverable {
 
         private final RecoverableWriter.ResumeRecoverable resumeRecoverable;
+        @Nullable private final Path targetPath;
 
+        @Deprecated
+        // Remained for state compatibility
         public OutputStreamBasedInProgressFileRecoverable(
                 final RecoverableWriter.ResumeRecoverable resumeRecoverable) {
+            this(resumeRecoverable, null);
+        }
+
+        public OutputStreamBasedInProgressFileRecoverable(
+                final RecoverableWriter.ResumeRecoverable resumeRecoverable,
+                @Nullable final Path targetPath) {
             this.resumeRecoverable = resumeRecoverable;
+            this.targetPath = targetPath;
         }
 
         RecoverableWriter.ResumeRecoverable getResumeRecoverable() {
             return resumeRecoverable;
         }
+
+        @Override
+        public Path getPath() {
+            return targetPath;
+        }
+
+        @Override
+        public long getSize() {
+            // File size of an in progress file is unavailable.
+            return -1L;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            OutputStreamBasedInProgressFileRecoverable that =
+                    (OutputStreamBasedInProgressFileRecoverable) o;
+            return Objects.equals(resumeRecoverable, that.resumeRecoverable)
+                    && Objects.equals(targetPath, that.targetPath);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(resumeRecoverable, targetPath);
+        }
     }
 
     static final class OutputStreamBasedPendingFile implements BucketWriter.PendingFile {
@@ -269,7 +367,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
 
         @Override
         public int getVersion() {
-            return 1;
+            return 2;
         }
 
         @Override
@@ -279,7 +377,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
                     (OutputStreamBasedInProgressFileRecoverable) inProgressRecoverable;
             DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
             dataOutputSerializer.writeInt(MAGIC_NUMBER);
-            serializeV1(outputStreamBasedInProgressRecoverable, dataOutputSerializer);
+            serializeV2(outputStreamBasedInProgressRecoverable, dataOutputSerializer);
             return dataOutputSerializer.getCopyOfBuffer();
         }
 
@@ -291,6 +389,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
                     DataInputView dataInputView = new DataInputDeserializer(serialized);
                     validateMagicNumber(dataInputView);
                     return deserializeV1(dataInputView);
+                case 2:
+                    dataInputView = new DataInputDeserializer(serialized);
+                    validateMagicNumber(dataInputView);
+                    return deserializeV2(dataInputView);
                 default:
                     throw new IOException("Unrecognized version or corrupt state: " + version);
             }
@@ -301,11 +403,17 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
             return resumeSerializer;
         }
 
-        private void serializeV1(
+        private void serializeV2(
                 final OutputStreamBasedInProgressFileRecoverable
                         outputStreamBasedInProgressRecoverable,
                 final DataOutputView dataOutputView)
                 throws IOException {
+            boolean pathAvailable = outputStreamBasedInProgressRecoverable.targetPath != null;
+            dataOutputView.writeBoolean(pathAvailable);
+            if (pathAvailable) {
+                dataOutputView.writeUTF(
+                        outputStreamBasedInProgressRecoverable.targetPath.toUri().toString());
+            }
             SimpleVersionedSerialization.writeVersionAndSerialize(
                     resumeSerializer,
                     outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
@@ -319,6 +427,18 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
                             resumeSerializer, dataInputView));
         }
 
+        private OutputStreamBasedInProgressFileRecoverable deserializeV2(
+                final DataInputView dataInputView) throws IOException {
+            Path path = null;
+            if (dataInputView.readBoolean()) {
+                path = new Path(dataInputView.readUTF());
+            }
+            return new OutputStreamBasedInProgressFileRecoverable(
+                    SimpleVersionedSerialization.readVersionAndDeSerialize(
+                            resumeSerializer, dataInputView),
+                    path);
+        }
+
         private static void validateMagicNumber(final DataInputView dataInputView)
                 throws IOException {
             final int magicNumber = dataInputView.readInt();
@@ -346,7 +466,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
 
         @Override
         public int getVersion() {
-            return 1;
+            return 2;
         }
 
         @Override
@@ -355,7 +475,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
                     (OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable;
             DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
             dataOutputSerializer.writeInt(MAGIC_NUMBER);
-            serializeV1(outputStreamBasedPendingFileRecoverable, dataOutputSerializer);
+            serializeV2(outputStreamBasedPendingFileRecoverable, dataOutputSerializer);
             return dataOutputSerializer.getCopyOfBuffer();
         }
 
@@ -367,7 +487,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
                     DataInputDeserializer in = new DataInputDeserializer(serialized);
                     validateMagicNumber(in);
                     return deserializeV1(in);
-
+                case 2:
+                    in = new DataInputDeserializer(serialized);
+                    validateMagicNumber(in);
+                    return deserializeV2(in);
                 default:
                     throw new IOException("Unrecognized version or corrupt state: " + version);
             }
@@ -378,11 +501,18 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
             return this.commitSerializer;
         }
 
-        private void serializeV1(
+        private void serializeV2(
                 final OutputStreamBasedPendingFileRecoverable
                         outputStreamBasedPendingFileRecoverable,
                 final DataOutputView dataOutputView)
                 throws IOException {
+            boolean pathAvailable = outputStreamBasedPendingFileRecoverable.targetPath != null;
+            dataOutputView.writeBoolean(pathAvailable);
+            if (pathAvailable) {
+                dataOutputView.writeUTF(
+                        outputStreamBasedPendingFileRecoverable.targetPath.toUri().toString());
+            }
+            dataOutputView.writeLong(outputStreamBasedPendingFileRecoverable.getSize());
             SimpleVersionedSerialization.writeVersionAndSerialize(
                     commitSerializer,
                     outputStreamBasedPendingFileRecoverable.getCommitRecoverable(),
@@ -396,6 +526,20 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
                             commitSerializer, dataInputView));
         }
 
+        private OutputStreamBasedPendingFileRecoverable deserializeV2(
+                final DataInputView dataInputView) throws IOException {
+            Path path = null;
+            if (dataInputView.readBoolean()) {
+                path = new Path(dataInputView.readUTF());
+            }
+            long size = dataInputView.readLong();
+            return new OutputStreamBasedPendingFileRecoverable(
+                    SimpleVersionedSerialization.readVersionAndDeSerialize(
+                            commitSerializer, dataInputView),
+                    path,
+                    size);
+        }
+
         private static void validateMagicNumber(final DataInputView dataInputView)
                 throws IOException {
             final int magicNumber = dataInputView.readInt();
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
index 3476c26..1799195 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
@@ -47,13 +47,14 @@ public class RowWiseBucketWriter<IN, BucketID>
     public InProgressFileWriter<IN, BucketID> resumeFrom(
             final BucketID bucketId,
             final RecoverableFsDataOutputStream stream,
+            final Path path,
             final RecoverableWriter.ResumeRecoverable resumable,
             final long creationTime) {
 
         Preconditions.checkNotNull(stream);
         Preconditions.checkNotNull(resumable);
 
-        return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+        return new RowWisePartWriter<>(bucketId, path, stream, encoder, creationTime);
     }
 
     @Override
@@ -66,6 +67,6 @@ public class RowWiseBucketWriter<IN, BucketID>
         Preconditions.checkNotNull(stream);
         Preconditions.checkNotNull(path);
 
-        return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+        return new RowWisePartWriter<>(bucketId, path, stream, encoder, creationTime);
     }
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index f2b473c..2510c41 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.util.Preconditions;
 
@@ -37,10 +38,11 @@ public final class RowWisePartWriter<IN, BucketID>
 
     public RowWisePartWriter(
             final BucketID bucketId,
+            final Path path,
             final RecoverableFsDataOutputStream currentPartStream,
             final Encoder<IN> encoder,
             final long creationTime) {
-        super(bucketId, currentPartStream, creationTime);
+        super(bucketId, path, currentPartStream, creationTime);
         this.encoder = Preconditions.checkNotNull(encoder);
     }
 
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
index cf30036..ccb6af7 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
@@ -70,7 +70,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
         writer.flush();
         writer.finish();
         fileCommitter.preCommit();
-        return new HadoopPathBasedPendingFile(fileCommitter).getRecoverable();
+        return new HadoopPathBasedPendingFile(fileCommitter, getSize()).getRecoverable();
     }
 
     @Override
@@ -86,8 +86,11 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
     static class HadoopPathBasedPendingFile implements BucketWriter.PendingFile {
         private final HadoopFileCommitter fileCommitter;
 
-        public HadoopPathBasedPendingFile(HadoopFileCommitter fileCommitter) {
+        private final long fileSize;
+
+        public HadoopPathBasedPendingFile(HadoopFileCommitter fileCommitter, long fileSize) {
             this.fileCommitter = fileCommitter;
+            this.fileSize = fileSize;
         }
 
         @Override
@@ -102,7 +105,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
 
         public PendingFileRecoverable getRecoverable() {
             return new HadoopPathBasedPendingFileRecoverable(
-                    fileCommitter.getTargetFilePath(), fileCommitter.getTempFilePath());
+                    fileCommitter.getTargetFilePath(), fileCommitter.getTempFilePath(), fileSize);
         }
     }
 
@@ -112,9 +115,21 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
 
         private final Path tempFilePath;
 
+        private final long fileSize;
+
+        @Deprecated
+        // Remained for compatibility
         public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path tempFilePath) {
             this.targetFilePath = targetFilePath;
             this.tempFilePath = tempFilePath;
+            this.fileSize = -1L;
+        }
+
+        public HadoopPathBasedPendingFileRecoverable(
+                Path targetFilePath, Path tempFilePath, long fileSize) {
+            this.targetFilePath = targetFilePath;
+            this.tempFilePath = tempFilePath;
+            this.fileSize = fileSize;
         }
 
         public Path getTargetFilePath() {
@@ -124,6 +139,16 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
         public Path getTempFilePath() {
             return tempFilePath;
         }
+
+        @Override
+        public org.apache.flink.core.fs.Path getPath() {
+            return new org.apache.flink.core.fs.Path(targetFilePath.toString());
+        }
+
+        @Override
+        public long getSize() {
+            return fileSize;
+        }
     }
 
     @VisibleForTesting
@@ -139,7 +164,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
 
         @Override
         public int getVersion() {
-            return 1;
+            return 2;
         }
 
         @Override
@@ -159,13 +184,15 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
             byte[] pathBytes = path.toUri().toString().getBytes(CHARSET);
             byte[] inProgressBytes = inProgressPath.toUri().toString().getBytes(CHARSET);
 
-            byte[] targetBytes = new byte[12 + pathBytes.length + inProgressBytes.length];
+            byte[] targetBytes =
+                    new byte[12 + pathBytes.length + inProgressBytes.length + Long.BYTES];
             ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
             bb.putInt(MAGIC_NUMBER);
             bb.putInt(pathBytes.length);
             bb.put(pathBytes);
             bb.putInt(inProgressBytes.length);
             bb.put(inProgressBytes);
+            bb.putLong(hadoopRecoverable.getSize());
 
             return targetBytes;
         }
@@ -176,6 +203,8 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
             switch (version) {
                 case 1:
                     return deserializeV1(serialized);
+                case 2:
+                    return deserializeV2(serialized);
                 default:
                     throw new IOException("Unrecognized version or corrupt state: " + version);
             }
@@ -200,6 +229,28 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
             return new HadoopPathBasedPendingFileRecoverable(
                     new Path(targetFilePath), new Path(tempFilePath));
         }
+
+        private HadoopPathBasedPendingFileRecoverable deserializeV2(byte[] serialized)
+                throws IOException {
+            final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+            if (bb.getInt() != MAGIC_NUMBER) {
+                throw new IOException("Corrupt data: Unexpected magic number.");
+            }
+
+            byte[] targetFilePathBytes = new byte[bb.getInt()];
+            bb.get(targetFilePathBytes);
+            String targetFilePath = new String(targetFilePathBytes, CHARSET);
+
+            byte[] tempFilePathBytes = new byte[bb.getInt()];
+            bb.get(tempFilePathBytes);
+            String tempFilePath = new String(tempFilePathBytes, CHARSET);
+
+            long fileSize = bb.getLong();
+
+            return new HadoopPathBasedPendingFileRecoverable(
+                    new Path(targetFilePath), new Path(tempFilePath), fileSize);
+        }
     }
 
     private static class UnsupportedInProgressFileRecoverableSerializable
@@ -281,7 +332,8 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
                     fileCommitterFactory.recoverForCommit(
                             configuration,
                             hadoopRecoverable.getTargetFilePath(),
-                            hadoopRecoverable.getTempFilePath()));
+                            hadoopRecoverable.getTempFilePath()),
+                    hadoopRecoverable.getSize());
         }
 
         @Override

[flink] 04/06: [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 824752c82729b5fd6aab5c6f205476ae63c8aff5
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jan 25 18:31:17 2022 +0800

    [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter.
---
 .../connector/file/sink/FileSinkCommittable.java   | 74 +++++++++++++++++++++-
 .../file/sink/FileSinkCommittableSerializer.java   | 46 ++++++++++++--
 .../file/sink/committer/FileCommitter.java         | 21 ++++++
 .../file/sink/writer/FileWriterBucket.java         |  5 +-
 .../file/sink/FileCommittableSerializerTest.java   | 25 +++++++-
 .../file/sink/committer/FileCommitterTest.java     | 14 ++--
 6 files changed, 167 insertions(+), 18 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java
index 2c5e8e5..7ea5b1d 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java
@@ -19,11 +19,13 @@
 package org.apache.flink.connector.file.sink;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -34,26 +36,51 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class FileSinkCommittable implements Serializable {
 
+    private final String bucketId;
+
     @Nullable private final InProgressFileWriter.PendingFileRecoverable pendingFile;
 
     @Nullable private final InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup;
 
-    public FileSinkCommittable(InProgressFileWriter.PendingFileRecoverable pendingFile) {
+    @Nullable private final Path compactedFileToCleanup;
+
+    public FileSinkCommittable(
+            String bucketId, InProgressFileWriter.PendingFileRecoverable pendingFile) {
+        this.bucketId = bucketId;
         this.pendingFile = checkNotNull(pendingFile);
         this.inProgressFileToCleanup = null;
+        this.compactedFileToCleanup = null;
     }
 
     public FileSinkCommittable(
+            String bucketId,
             InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) {
+        this.bucketId = bucketId;
         this.pendingFile = null;
         this.inProgressFileToCleanup = checkNotNull(inProgressFileToCleanup);
+        this.compactedFileToCleanup = null;
+    }
+
+    public FileSinkCommittable(String bucketId, Path compactedFileToCleanup) {
+        this.bucketId = bucketId;
+        this.pendingFile = null;
+        this.inProgressFileToCleanup = null;
+        this.compactedFileToCleanup = checkNotNull(compactedFileToCleanup);
     }
 
     FileSinkCommittable(
+            String bucketId,
             @Nullable InProgressFileWriter.PendingFileRecoverable pendingFile,
-            @Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) {
+            @Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup,
+            @Nullable Path compactedFileToCleanup) {
+        this.bucketId = bucketId;
         this.pendingFile = pendingFile;
         this.inProgressFileToCleanup = inProgressFileToCleanup;
+        this.compactedFileToCleanup = compactedFileToCleanup;
+    }
+
+    public String getBucketId() {
+        return bucketId;
     }
 
     public boolean hasPendingFile() {
@@ -73,4 +100,47 @@ public class FileSinkCommittable implements Serializable {
     public InProgressFileWriter.InProgressFileRecoverable getInProgressFileToCleanup() {
         return inProgressFileToCleanup;
     }
+
+    public boolean hasCompactedFileToCleanup() {
+        return compactedFileToCleanup != null;
+    }
+
+    @Nullable
+    public Path getCompactedFileToCleanup() {
+        return compactedFileToCleanup;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FileSinkCommittable that = (FileSinkCommittable) o;
+        return Objects.equals(bucketId, that.bucketId)
+                && Objects.equals(pendingFile, that.pendingFile)
+                && Objects.equals(inProgressFileToCleanup, that.inProgressFileToCleanup)
+                && Objects.equals(compactedFileToCleanup, that.compactedFileToCleanup);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bucketId, pendingFile, inProgressFileToCleanup, compactedFileToCleanup);
+    }
+
+    @Override
+    public String toString() {
+        return "FileSinkCommittable{"
+                + "bucketId='"
+                + bucketId
+                + ", pendingFile="
+                + pendingFile
+                + ", inProgressFileToCleanup="
+                + inProgressFileToCleanup
+                + ", compactedFileToCleanup="
+                + compactedFileToCleanup
+                + '}';
+    }
 }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
index 99ef42f..febc9a5 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.sink;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -55,14 +56,14 @@ public class FileSinkCommittableSerializer
 
     @Override
     public int getVersion() {
-        return 1;
+        return 2;
     }
 
     @Override
     public byte[] serialize(FileSinkCommittable committable) throws IOException {
         DataOutputSerializer out = new DataOutputSerializer(256);
         out.writeInt(MAGIC_NUMBER);
-        serializeV1(committable, out);
+        serializeV2(committable, out);
         return out.getCopyOfBuffer();
     }
 
@@ -74,14 +75,17 @@ public class FileSinkCommittableSerializer
             case 1:
                 validateMagicNumber(in);
                 return deserializeV1(in);
+            case 2:
+                validateMagicNumber(in);
+                return deserializeV2(in);
             default:
                 throw new IOException("Unrecognized version or corrupt state: " + version);
         }
     }
 
-    private void serializeV1(FileSinkCommittable committable, DataOutputView dataOutputView)
+    private void serializeV2(FileSinkCommittable committable, DataOutputView dataOutputView)
             throws IOException {
-
+        dataOutputView.writeUTF(committable.getBucketId());
         if (committable.hasPendingFile()) {
             dataOutputView.writeBoolean(true);
             SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -99,6 +103,13 @@ public class FileSinkCommittableSerializer
         } else {
             dataOutputView.writeBoolean(false);
         }
+
+        if (committable.hasCompactedFileToCleanup()) {
+            dataOutputView.writeBoolean(true);
+            dataOutputView.writeUTF(committable.getCompactedFileToCleanup().toUri().toString());
+        } else {
+            dataOutputView.writeBoolean(false);
+        }
     }
 
     private FileSinkCommittable deserializeV1(DataInputView dataInputView) throws IOException {
@@ -116,7 +127,32 @@ public class FileSinkCommittableSerializer
                             inProgressFileSerializer, dataInputView);
         }
 
-        return new FileSinkCommittable(pendingFile, inProgressFileToCleanup);
+        return new FileSinkCommittable("", pendingFile, inProgressFileToCleanup, null);
+    }
+
+    private FileSinkCommittable deserializeV2(DataInputView dataInputView) throws IOException {
+        String bucketId = dataInputView.readUTF();
+        InProgressFileWriter.PendingFileRecoverable pendingFile = null;
+        if (dataInputView.readBoolean()) {
+            pendingFile =
+                    SimpleVersionedSerialization.readVersionAndDeSerialize(
+                            pendingFileSerializer, dataInputView);
+        }
+
+        InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup = null;
+        if (dataInputView.readBoolean()) {
+            inProgressFileToCleanup =
+                    SimpleVersionedSerialization.readVersionAndDeSerialize(
+                            inProgressFileSerializer, dataInputView);
+        }
+
+        Path committedFileToCleanup = null;
+        if (dataInputView.readBoolean()) {
+            committedFileToCleanup = new Path(dataInputView.readUTF());
+        }
+
+        return new FileSinkCommittable(
+                bucketId, pendingFile, inProgressFileToCleanup, committedFileToCleanup);
     }
 
     private static void validateMagicNumber(DataInputView in) throws IOException {
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
index c72b399..7590178 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
@@ -22,8 +22,12 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Collection;
 
@@ -40,6 +44,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class FileCommitter implements Committer<FileSinkCommittable> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class);
+
     private final BucketWriter<?, ?> bucketWriter;
 
     public FileCommitter(BucketWriter<?, ?> bucketWriter) {
@@ -60,6 +66,21 @@ public class FileCommitter implements Committer<FileSinkCommittable> {
                 bucketWriter.cleanupInProgressFileRecoverable(
                         committable.getInProgressFileToCleanup());
             }
+
+            if (committable.hasCompactedFileToCleanup()) {
+                Path committedFileToCleanup = committable.getCompactedFileToCleanup();
+                try {
+                    committedFileToCleanup.getFileSystem().delete(committedFileToCleanup, false);
+                } catch (Exception e) {
+                    // Try best to cleanup compacting files, skip if failed.
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Failed to cleanup a compacted file, the file will be remained and should not be visible: {}",
+                                committedFileToCleanup,
+                                e);
+                    }
+                }
+            }
         }
     }
 
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
index 3f5f5f9..8385fe9 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
@@ -202,11 +202,12 @@ class FileWriterBucket<IN> {
         }
 
         List<FileSinkCommittable> committables = new ArrayList<>();
-        pendingFiles.forEach(pendingFile -> committables.add(new FileSinkCommittable(pendingFile)));
+        pendingFiles.forEach(
+                pendingFile -> committables.add(new FileSinkCommittable(bucketId, pendingFile)));
         pendingFiles.clear();
 
         if (inProgressFileToCleanup != null) {
-            committables.add(new FileSinkCommittable(inProgressFileToCleanup));
+            committables.add(new FileSinkCommittable(bucketId, inProgressFileToCleanup));
             inProgressFileToCleanup = null;
         }
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java
index 523fde0..ffda5dd 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.sink;
 
 import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
+import org.apache.flink.core.fs.Path;
 
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -36,23 +37,43 @@ public class FileCommittableSerializerTest {
     @Test
     public void testCommittableWithPendingFile() throws IOException {
         FileSinkCommittable committable =
-                new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable());
+                new FileSinkCommittable("0", new FileSinkTestUtils.TestPendingFileRecoverable());
         FileSinkCommittable deserialized = serializeAndDeserialize(committable);
+        assertEquals(committable.getBucketId(), deserialized.getBucketId());
         assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
         assertEquals(
                 committable.getInProgressFileToCleanup(),
                 deserialized.getInProgressFileToCleanup());
+        assertEquals(
+                committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
     }
 
     @Test
     public void testCommittableWithInProgressFileToCleanup() throws IOException {
         FileSinkCommittable committable =
-                new FileSinkCommittable(new FileSinkTestUtils.TestInProgressFileRecoverable());
+                new FileSinkCommittable("0", new FileSinkTestUtils.TestInProgressFileRecoverable());
+        FileSinkCommittable deserialized = serializeAndDeserialize(committable);
+        assertEquals(committable.getBucketId(), deserialized.getBucketId());
+        assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
+        assertEquals(
+                committable.getInProgressFileToCleanup(),
+                deserialized.getInProgressFileToCleanup());
+        assertEquals(
+                committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
+    }
+
+    @Test
+    public void testCommittableWithCompactedFileToCleanup() throws IOException {
+        FileSinkCommittable committable =
+                new FileSinkCommittable("0", new Path("/tmp/mock_path_to_cleanup"));
         FileSinkCommittable deserialized = serializeAndDeserialize(committable);
+        assertEquals(committable.getBucketId(), deserialized.getBucketId());
         assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
         assertEquals(
                 committable.getInProgressFileToCleanup(),
                 deserialized.getInProgressFileToCleanup());
+        assertEquals(
+                committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
     }
 
     private FileSinkCommittable serializeAndDeserialize(FileSinkCommittable committable)
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
index 0b07370..5b7e21b 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
@@ -50,7 +50,7 @@ public class FileCommitterTest {
         MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
                 new MockCommitRequest<>(
                         new FileSinkCommittable(
-                                new FileSinkTestUtils.TestPendingFileRecoverable()));
+                                "0", new FileSinkTestUtils.TestPendingFileRecoverable()));
         fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
 
         assertEquals(1, stubBucketWriter.getRecoveredPendingFiles().size());
@@ -67,7 +67,7 @@ public class FileCommitterTest {
         MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
                 new MockCommitRequest<>(
                         new FileSinkCommittable(
-                                new FileSinkTestUtils.TestInProgressFileRecoverable()));
+                                "0", new FileSinkTestUtils.TestInProgressFileRecoverable()));
         fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
 
         assertEquals(0, stubBucketWriter.getRecoveredPendingFiles().size());
@@ -83,15 +83,15 @@ public class FileCommitterTest {
         Collection<CommitRequest<FileSinkCommittable>> committables =
                 Stream.of(
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                        "0", new FileSinkTestUtils.TestPendingFileRecoverable()),
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                        "0", new FileSinkTestUtils.TestPendingFileRecoverable()),
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestInProgressFileRecoverable()),
+                                        "0", new FileSinkTestUtils.TestInProgressFileRecoverable()),
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                        "0", new FileSinkTestUtils.TestPendingFileRecoverable()),
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestInProgressFileRecoverable()))
+                                        "0", new FileSinkTestUtils.TestInProgressFileRecoverable()))
                         .map(MockCommitRequest::new)
                         .collect(Collectors.toList());
         fileCommitter.commit(committables);

[flink] 01/06: [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a0d6fa4de610ded1220845365c59a84831bc454
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 9 18:56:41 2022 +0800

    [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink.
---
 .../runtime/translators/SinkTransformationTranslator.java     | 11 -----------
 1 file changed, 11 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index d2efbb8..97c19d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
 import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
 import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
 import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
@@ -233,16 +232,6 @@ public class SinkTransformationTranslator<Input, Output>
                     transformations.subList(numTransformsBefore, transformations.size());
 
             for (Transformation<?> subTransformation : expandedTransformations) {
-                // Skip overwriting the parallelism for the global committer
-                if (subTransformation.getName() == null
-                        || !subTransformation
-                                .getName()
-                                .equals(
-                                        StandardSinkTopologies
-                                                .GLOBAL_COMMITTER_TRANSFORMATION_NAME)) {
-                    subTransformation.setParallelism(transformation.getParallelism());
-                }
-
                 concatUid(
                         subTransformation,
                         Transformation::getUid,