You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/03/08 06:09:35 UTC

[flink] branch master updated: [FLINK-26440][connector/filesystem] Make CompactorOperatorStateHandler supporting unaligned checkpoint.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 13b203f  [FLINK-26440][connector/filesystem] Make CompactorOperatorStateHandler supporting unaligned checkpoint.
13b203f is described below

commit 13b203fef748bdbe9b1d14ba01f23ca6c6b24b7e
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Fri Mar 4 11:24:44 2022 +0800

    [FLINK-26440][connector/filesystem] Make CompactorOperatorStateHandler supporting unaligned checkpoint.
    
    This closes #18955.
---
 .../operator/CompactorOperatorStateHandler.java    | 354 +++++++++++----------
 .../file/sink/compactor/CompactorOperatorTest.java | 140 +++++++-
 2 files changed, 314 insertions(+), 180 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
index 70e6458..37d83ec 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
@@ -23,16 +23,15 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
-import org.apache.flink.connector.file.sink.compactor.FileCompactor;
 import org.apache.flink.connector.file.sink.compactor.IdenticalFileCompactor;
 import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.RemainingRequestsSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
-import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -41,6 +40,9 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Either;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -61,31 +63,12 @@ public class CompactorOperatorStateHandler
 
     private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
     private final BucketWriter<?, String> bucketWriter;
-
-    private final FileCompactor fileCompactor;
+    private State state = State.HANDLING_STATE;
 
     private transient CompactService compactService;
-
-    // Flag indicating the in-progress file of the previous run from the writer has been received
-    // and processed.
-    private boolean writerStateDrained = false;
-
-    // Flag indicating all compaction related states are drained, the operator can now pass through
-    // everything.
-    private boolean stateDrained = false;
-
-    // There may be a in-progress file of the previous run that we have to process as a compaction
-    // request first, or the file is invisible after committing.
-    // We have to hold the summary and committables (of this run), and send them along with the
-    // result of this compaction request, as well as the results of the remaining requests of this
-    // operator, if there are.
-    private CommittableSummary<FileSinkCommittable> holdingSummary;
-    private List<CommittableMessage<FileSinkCommittable>> holdingMessages;
-    private final List<CommittableMessage<FileSinkCommittable>> compactingMessages =
-            new ArrayList<>();
-
     private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>>
-            compactingRequests = new ArrayList<>();
+            compactingRequests = new LinkedList<>();
+
     private SimpleVersionedListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
     private Iterable<Map<Long, List<CompactorRequest>>> stateRemaining;
 
@@ -94,14 +77,25 @@ public class CompactorOperatorStateHandler
             BucketWriter<?, String> bucketWriter) {
         this.committableSerializer = committableSerializer;
         this.bucketWriter = bucketWriter;
+    }
 
-        this.fileCompactor = new IdenticalFileCompactor();
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+        // CompactService is not initialized yet, we can not submit requests here.
+        remainingRequestsState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                .getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
+                        new RemainingRequestsSerializer(
+                                new CompactorRequestSerializer(committableSerializer)));
+        stateRemaining = remainingRequestsState.get();
     }
 
     @Override
     public void open() throws Exception {
         super.open();
-        this.compactService = new CompactService(1, fileCompactor, bucketWriter);
+        this.compactService = new CompactService(1, new IdenticalFileCompactor(), bucketWriter);
         compactService.open();
 
         if (stateRemaining != null) {
@@ -122,12 +116,14 @@ public class CompactorOperatorStateHandler
                         for (FileSinkCommittable toCompact : toCompactList) {
                             CompactorRequest compactRequest = new CompactorRequest(bucketId);
                             compactRequest.addToCompact(toCompact);
-                            submit(compactRequest);
+                            compactingRequests.add(
+                                    new Tuple2<>(compactRequest, submit(compactRequest)));
                         }
 
                         CompactorRequest passThroughRequest = new CompactorRequest(bucketId);
                         toPassThrough.forEach(passThroughRequest::addToPassthrough);
-                        submit(passThroughRequest);
+                        compactingRequests.add(
+                                new Tuple2<>(passThroughRequest, submit(passThroughRequest)));
                     }
                 }
             }
@@ -141,77 +137,137 @@ public class CompactorOperatorStateHandler
             throws Exception {
         Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> record =
                 element.getValue();
-        if (stateDrained) {
+        if (state == State.PASSING_THROUGH_ALL) {
             // all input should be committable messages to pass through
             output.collect(new StreamRecord<>(record.left()));
             return;
         }
 
+        if (state == State.PASSING_THROUGH_COMMITTABLE) {
+            checkState(
+                    record.isLeft(),
+                    "Compacting requests is not expected once a normal committable is received.");
+            CommittableMessage<FileSinkCommittable> message = record.left();
+            if (message instanceof CommittableWithLineage) {
+                checkState(
+                        !isHiddenCommittable((CommittableWithLineage<FileSinkCommittable>) message),
+                        "Hidden committable is not expected once a normal committable is received.");
+                output.collect(new StreamRecord<>(message));
+            } else {
+                // Message is summary
+                if (compactingRequests.isEmpty()) {
+                    // No compacting requests remained
+                    state = State.PASSING_THROUGH_ALL;
+                    output.collect(new StreamRecord<>(message));
+                    return;
+                }
+                appendCompactingResultsToSummary((CommittableSummary<FileSinkCommittable>) message);
+            }
+            return;
+        }
+
         if (record.isRight()) {
-            submit(element.getValue().right());
+            CompactorRequest request = element.getValue().right();
+            compactingRequests.add(new Tuple2<>(request, submit(request)));
             return;
         }
 
         CommittableMessage<FileSinkCommittable> message = record.left();
-        if (message instanceof CommittableSummary) {
-            checkState(holdingSummary == null, "Duplicate summary before the first checkpoint.");
-            holdingSummary = (CommittableSummary<FileSinkCommittable>) message;
-            holdingMessages = new ArrayList<>(holdingSummary.getNumberOfCommittables());
-        } else {
-            boolean compacting = false;
-            CommittableWithLineage<FileSinkCommittable> committableWithLineage =
-                    (CommittableWithLineage<FileSinkCommittable>) message;
-            if (committableWithLineage.getCommittable().hasPendingFile()) {
-                FileSinkCommittable committable = committableWithLineage.getCommittable();
-                PendingFileRecoverable pendingFile = committable.getPendingFile();
-                if (pendingFile.getPath() != null
-                        && pendingFile.getPath().getName().startsWith(".")) {
-                    // The pending file is the in-progress file of the previous run, which
-                    // should be committed and compacted before sending to the committer.
-                    CompactorRequest request = new CompactorRequest(committable.getBucketId());
-                    request.addToCompact(committable);
-                    submit(request);
-
-                    compacting = true;
-                    compactingMessages.add(message);
-                } else {
-                    // A normal file is received, indicating the writer state is drained.
-                    writerStateDrained = true;
-                    if (compactingMessages.isEmpty() && compactingRequests.isEmpty()) {
-                        // No state needs to be handled, the holding summary and all committable
-                        // messages can be sent eagerly
-                        checkState(holdingSummary != null);
-                        output.collect(new StreamRecord<>(holdingSummary));
-                        holdingSummary = null;
-
-                        this.stateDrained = true;
-                        output.collect(new StreamRecord<>(committableWithLineage));
-                    }
-                }
+        if (message instanceof CommittableWithLineage) {
+            if (isHiddenCommittable((CommittableWithLineage<FileSinkCommittable>) message)) {
+                handleHiddenCommittable((CommittableWithLineage<FileSinkCommittable>) message);
+            } else {
+                // No more hidden committable is expected
+                state = State.PASSING_THROUGH_COMMITTABLE;
+                output.collect(new StreamRecord<>(message));
             }
-            if (!compacting && !stateDrained) {
-                // Compacting messages should not be added
-                // If the state is drained, no further messages need to be added
-                holdingMessages.add(message);
+        } else {
+            if (compactingRequests.isEmpty()) {
+                output.collect(new StreamRecord<>(message));
+                return;
             }
+            appendCompactingResultsToSummary((CommittableSummary<FileSinkCommittable>) message);
         }
     }
 
-    @Override
-    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
-        super.prepareSnapshotPreBarrier(checkpointId);
-        if (stateDrained) {
-            return;
+    private void appendCompactingResultsToSummary(CommittableSummary<FileSinkCommittable> summary)
+            throws ExecutionException, InterruptedException {
+        // To guarantee the order, we have to wait for all results here.
+        List<FileSinkCommittable> results = new ArrayList<>();
+        for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> t :
+                compactingRequests) {
+            t.f1.get().forEach(results::add);
+        }
+        compactingRequests.clear();
+
+        // Append the results to the summary and send them following it
+        output.collect(
+                new StreamRecord<>(
+                        new CommittableSummary<>(
+                                summary.getSubtaskId(),
+                                summary.getNumberOfSubtasks(),
+                                getCheckpointId(summary),
+                                summary.getNumberOfCommittables() + results.size(),
+                                summary.getNumberOfPendingCommittables() + results.size(),
+                                summary.getNumberOfFailedCommittables())));
+        for (FileSinkCommittable committable : results) {
+            output.collect(
+                    new StreamRecord<>(
+                            new CommittableWithLineage<>(
+                                    committable,
+                                    getCheckpointId(summary),
+                                    summary.getSubtaskId())));
+        }
+    }
+
+    private boolean isHiddenCommittable(CommittableWithLineage<FileSinkCommittable> message) {
+        return message.getCommittable().hasPendingFile()
+                && message.getCommittable().getPendingFile().getPath() != null
+                && message.getCommittable().getPendingFile().getPath().getName().startsWith(".");
+    }
+
+    private void handleHiddenCommittable(CommittableWithLineage<FileSinkCommittable> message)
+            throws ExecutionException, InterruptedException {
+        FileSinkCommittable committable = message.getCommittable();
+
+        // The pending file is the in-progress file of the previous run, which
+        // should be committed and compacted before sending to the committer.
+        CompactorRequest request = new CompactorRequest(committable.getBucketId());
+        request.addToCompact(committable);
+
+        // Wait for the result synchronously, and pass though the result, but append
+        // cleanup request to the next summary, since the count of pending committable
+        // for this checkpoint is immutable now
+        Iterable<FileSinkCommittable> result = submit(request).get();
+        Long checkpointId = getCheckpointId(message);
+        boolean pendingFileSent = false;
+        for (FileSinkCommittable c : result) {
+            if (c.hasPendingFile()) {
+                checkState(
+                        !pendingFileSent,
+                        "A in-progress file should not be converted to multiple pending files");
+                pendingFileSent = true;
+                output.collect(
+                        new StreamRecord<>(
+                                new CommittableWithLineage<>(
+                                        c, checkpointId, message.getSubtaskId())));
+            } else {
+                // Wrap cleanup request as pass through request and reserved in the
+                // compacting requests.
+                // These requests will be appended to the next summary, if there is.
+                CompactorRequest passThroughRequest = new CompactorRequest(c.getBucketId());
+                passThroughRequest.addToPassthrough(c);
+                compactingRequests.add(
+                        new Tuple2<>(passThroughRequest, submit(passThroughRequest)));
+            }
         }
-        drain();
-        // The operator is stateless once drain is called. snapshotState is not necessary.
     }
 
     @Override
     public void endInput() throws Exception {
-        if (!stateDrained) {
-            drain();
-        }
+        // Although there may be remaining cleanup requests in compactingRequests, there is no way
+        // to let Committer accepts them since the eoi summary has been sent.
+        // For now we can do nothing but leave them there.
     }
 
     @Override
@@ -222,102 +278,74 @@ public class CompactorOperatorStateHandler
         }
     }
 
-    private void submit(CompactorRequest request) {
-        CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>();
-        compactService.submit(request, resultFuture);
-        compactingRequests.add(new Tuple2<>(request, resultFuture));
-    }
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        if (compactingRequests.isEmpty()) {
+            return;
+        }
 
-    private void drain() throws ExecutionException, InterruptedException {
-        checkState(holdingSummary != null);
-        checkState(
-                holdingSummary.getNumberOfPendingCommittables()
-                                == holdingSummary.getNumberOfCommittables()
-                        && holdingSummary.getNumberOfCommittables()
-                                == holdingMessages.size() + compactingMessages.size());
-
-        Long checkpointId =
-                holdingSummary.getCheckpointId().isPresent()
-                        ? holdingSummary.getCheckpointId().getAsLong()
-                        : null;
-        int subtaskId = holdingSummary.getSubtaskId();
-
-        if (!compactingRequests.isEmpty()) {
-            CompletableFuture.allOf(
-                            compactingRequests.stream()
-                                    .map(r -> r.f1)
-                                    .toArray(CompletableFuture[]::new))
-                    .join();
-
-            for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>
-                    compacting : compactingRequests) {
-                CompletableFuture<Iterable<FileSinkCommittable>> future = compacting.f1;
-                checkState(future.isDone());
-                // Exception is thrown if it's completed exceptionally
-                for (FileSinkCommittable c : future.get()) {
-                    holdingMessages.add(new CommittableWithLineage<>(c, checkpointId, subtaskId));
+        // Results of some requests are not drained by a summary. They should be reserved in the
+        // state and wait for the next summary.
+
+        List<CompactorRequest> remainingRequests = new ArrayList<>();
+        for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> t :
+                compactingRequests) {
+            if (t.f1.isDone()) {
+                // We can add the results as a pass-through request if the compaction is done
+                Iterable<FileSinkCommittable> result = t.f1.get();
+                if (result.iterator().hasNext()) {
+                    String bucketId = result.iterator().next().getBucketId();
+                    CompactorRequest passThroughRequest = new CompactorRequest(bucketId);
+                    result.forEach(passThroughRequest::addToPassthrough);
+                    remainingRequests.add(passThroughRequest);
                 }
+            } else {
+                // Or we add the original request in the state
+                remainingRequests.add(t.f0);
             }
         }
-
-        // Appending the compacted committable to the holding summary
-        CommittableSummary<FileSinkCommittable> summary =
-                new CommittableSummary<>(
-                        holdingSummary.getSubtaskId(),
-                        holdingSummary.getNumberOfSubtasks(),
-                        holdingSummary.getCheckpointId().isPresent()
-                                ? holdingSummary.getCheckpointId().getAsLong()
-                                : null,
-                        holdingMessages.size(),
-                        holdingMessages.size(),
-                        holdingSummary.getNumberOfFailedCommittables());
-        output.collect(new StreamRecord<>(summary));
-        for (CommittableMessage<FileSinkCommittable> committable : holdingMessages) {
-            output.collect(new StreamRecord<>(committable));
-        }
-
-        // Remaining requests should be all done and their results are all emitted.
-        // From now on the operator is stateless.
-        remainingRequestsState.clear();
-
-        compactingRequests.clear();
-        compactingMessages.clear();
-        holdingSummary = null;
-        holdingMessages = null;
-
-        if (writerStateDrained) {
-            // We can pass through everything if the writer state is also drained.
-            stateDrained = true;
-            compactService.close();
-            compactService = null;
-        }
+        Map<Long, List<CompactorRequest>> requestsMap = new HashMap<>();
+        requestsMap.put(-1L, remainingRequests);
+        remainingRequestsState.update(Collections.singletonList(requestsMap));
     }
 
-    @Override
-    public void initializeState(StateInitializationContext context) throws Exception {
-        super.initializeState(context);
-
-        remainingRequestsState =
-                new SimpleVersionedListState<>(
-                        context.getOperatorStateStore()
-                                .getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
-                        new RemainingRequestsSerializer(
-                                new CompactorRequestSerializer(committableSerializer)));
-
-        stateRemaining = remainingRequestsState.get();
+    private Long getCheckpointId(CommittableMessage<FileSinkCommittable> message) {
+        return message.getCheckpointId().isPresent() ? message.getCheckpointId().getAsLong() : null;
+    }
 
-        // stateDrained can not be determined here, since even if the stateRemaining is empty,
-        // there may still be some requests from the coordinator and a in-progress file in the file
-        // writer
+    private CompletableFuture<Iterable<FileSinkCommittable>> submit(CompactorRequest request) {
+        CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>();
+        compactService.submit(request, resultFuture);
+        return resultFuture;
     }
 
     @VisibleForTesting
-    public boolean isWriterStateDrained() {
-        return writerStateDrained;
+    public boolean isPassingThroughCommittable() {
+        return state == State.PASSING_THROUGH_COMMITTABLE;
     }
 
     @VisibleForTesting
-    public boolean isStateDrained() {
-        return stateDrained;
+    public boolean isPassingThroughAll() {
+        return state == State.PASSING_THROUGH_ALL;
+    }
+
+    /** The handling state of this operator. */
+    private enum State {
+        /**
+         * Handling states of the previous run, including compaction requests from the coordinator
+         * and the hidden committable from the writer.
+         */
+        HANDLING_STATE,
+        /**
+         * All states of the previous run are handled, while some results need to be appended to the
+         * next summary. All committable can be passed through now.
+         */
+        PASSING_THROUGH_COMMITTABLE,
+        /**
+         * All states of the previous run are handled and all results are sent. The job of this
+         * operator is done and everything can be passed through directly.
+         */
+        PASSING_THROUGH_ALL
     }
 }
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java
index 6b7f397..09af83e 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java
@@ -308,28 +308,26 @@ public class CompactorOperatorTest extends AbstractCompactTestBase {
                                     new CommittableWithLineage<>(
                                             committable("0", "7", 8), 3L, 0))));
 
-            Assert.assertTrue(handler.isWriterStateDrained());
-            Assert.assertFalse(handler.isStateDrained());
+            Assert.assertTrue(handler.isPassingThroughCommittable());
+            Assert.assertFalse(handler.isPassingThroughAll());
 
-            // the result should not be emitted yet, but all requests should already be submitted
-            Assert.assertEquals(0, harness.extractOutputValues().size());
+            harness.processElement(
+                    new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 4L, 0, 0, 0))));
 
-            compactor.getAllTasksFuture().join();
-            // state should be drained, and all results and holding messages should be emitted
-            harness.prepareSnapshotPreBarrier(3);
+            harness.processElement(
+                    new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 5L, 3, 3, 0))));
 
-            Assert.assertTrue(handler.isStateDrained());
+            Assert.assertTrue(handler.isPassingThroughAll());
 
-            // summary should be merged into one
-            // 1 summary+ 1 compacted + (1 compacted committable + 1 compacted cleanup) * 7
+            // 1 summary + (1 compacted committable + 1 compacted cleanup) * 6 + 1 hidden + 1 normal
+            // + 1 summary + 1 cleanup + 1 summary
             List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
-            Assert.assertEquals(16, results.size());
+            Assert.assertEquals(18, results.size());
             SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
-                    .hasPendingCommittables(15);
+                    .hasPendingCommittables(14);
 
             List<FileSinkCommittable> expectedResult =
                     Arrays.asList(
-                            committable("0", "7", 8),
                             committable("0", "compacted-0", 1),
                             cleanupPath("0", ".0"),
                             committable("0", "compacted-1", 2),
@@ -343,12 +341,120 @@ public class CompactorOperatorTest extends AbstractCompactTestBase {
                             committable("0", "compacted-5", 6),
                             cleanupPath("0", ".5"),
                             committable("0", "compacted-6", 7),
-                            cleanupPath("0", ".6"));
+                            committable("0", "7", 8));
 
-            for (int i = 1; i < results.size(); ++i) {
-                SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(i))
-                        .hasCommittable(expectedResult.get(i - 1));
+            for (int i = 0; i < expectedResult.size(); ++i) {
+                SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(i + 1))
+                        .hasCommittable(expectedResult.get(i));
             }
+
+            SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(15))
+                    .hasPendingCommittables(1);
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(16))
+                    .hasCommittable(cleanupPath("0", ".6"));
+
+            SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(17))
+                    .hasPendingCommittables(3);
+        }
+    }
+
+    @Test
+    public void testStateHandlerRestore() throws Exception {
+        OperatorSubtaskState state;
+        try (OneInputStreamOperatorTestHarness<
+                        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+                        CommittableMessage<FileSinkCommittable>>
+                harness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new CompactorOperatorStateHandler(
+                                        getTestCommittableSerializer(),
+                                        createTestBucketWriter()))) {
+            harness.setup();
+            harness.open();
+
+            // remaining request from coordinator
+            harness.processElement(
+                    new StreamRecord<>(
+                            Either.Right(
+                                    request(
+                                                    "0",
+                                                    Collections.singletonList(
+                                                            committable("0", ".1", 1)),
+                                                    null)
+                                            .getValue())));
+
+            // process only summary during cp1, unaligned barrier may be processed ahead of the
+            // elements
+            harness.processElement(
+                    new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 1L, 2, 2, 0))));
+
+            state = harness.snapshot(1, 1L);
+
+            List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+            Assert.assertEquals(3, results.size());
+            SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
+                    .hasPendingCommittables(4);
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1))
+                    .hasCommittable(committable("0", "compacted-1", 1));
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(2))
+                    .hasCommittable(cleanupPath("0", ".1"));
+        }
+
+        try (OneInputStreamOperatorTestHarness<
+                        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+                        CommittableMessage<FileSinkCommittable>>
+                harness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new CompactorOperatorStateHandler(
+                                        getTestCommittableSerializer(),
+                                        createTestBucketWriter()))) {
+            harness.setup();
+            harness.initializeState(state);
+            harness.open();
+
+            harness.processElement(
+                    new StreamRecord<>(
+                            Either.Left(
+                                    new CommittableWithLineage<>(
+                                            committable("0", ".2", 2), 1L, 0))));
+
+            harness.processElement(
+                    new StreamRecord<>(
+                            Either.Left(
+                                    new CommittableWithLineage<>(
+                                            committable("0", "3", 3), 1L, 0))));
+
+            state = harness.snapshot(2, 2L);
+
+            List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+            Assert.assertEquals(2, results.size());
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(0))
+                    .hasCommittable(committable("0", "2", 2));
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1))
+                    .hasCommittable(committable("0", "3", 3));
+        }
+
+        try (OneInputStreamOperatorTestHarness<
+                        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+                        CommittableMessage<FileSinkCommittable>>
+                harness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new CompactorOperatorStateHandler(
+                                        getTestCommittableSerializer(),
+                                        createTestBucketWriter()))) {
+            harness.setup();
+            harness.initializeState(state);
+            harness.open();
+
+            harness.processElement(
+                    new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 2L, 0, 0, 0))));
+
+            List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+            Assert.assertEquals(2, results.size());
+            SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
+                    .hasPendingCommittables(1);
+            SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1))
+                    .hasCommittable(cleanupPath("0", ".2"));
         }
     }