You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/03/08 06:09:35 UTC
[flink] branch master updated: [FLINK-26440][connector/filesystem] Make CompactorOperatorStateHandler supporting unaligned checkpoint.
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 13b203f [FLINK-26440][connector/filesystem] Make CompactorOperatorStateHandler supporting unaligned checkpoint.
13b203f is described below
commit 13b203fef748bdbe9b1d14ba01f23ca6c6b24b7e
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Fri Mar 4 11:24:44 2022 +0800
[FLINK-26440][connector/filesystem] Make CompactorOperatorStateHandler supporting unaligned checkpoint.
This closes #18955.
---
.../operator/CompactorOperatorStateHandler.java | 354 +++++++++++----------
.../file/sink/compactor/CompactorOperatorTest.java | 140 +++++++-
2 files changed, 314 insertions(+), 180 deletions(-)
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
index 70e6458..37d83ec 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
@@ -23,16 +23,15 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
-import org.apache.flink.connector.file.sink.compactor.FileCompactor;
import org.apache.flink.connector.file.sink.compactor.IdenticalFileCompactor;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.RemainingRequestsSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
-import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -41,6 +40,9 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -61,31 +63,12 @@ public class CompactorOperatorStateHandler
private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
private final BucketWriter<?, String> bucketWriter;
-
- private final FileCompactor fileCompactor;
+ private State state = State.HANDLING_STATE;
private transient CompactService compactService;
-
- // Flag indicating the in-progress file of the previous run from the writer has been received
- // and processed.
- private boolean writerStateDrained = false;
-
- // Flag indicating all compaction related states are drained, the operator can now pass through
- // everything.
- private boolean stateDrained = false;
-
- // There may be a in-progress file of the previous run that we have to process as a compaction
- // request first, or the file is invisible after committing.
- // We have to hold the summary and committables (of this run), and send them along with the
- // result of this compaction request, as well as the results of the remaining requests of this
- // operator, if there are.
- private CommittableSummary<FileSinkCommittable> holdingSummary;
- private List<CommittableMessage<FileSinkCommittable>> holdingMessages;
- private final List<CommittableMessage<FileSinkCommittable>> compactingMessages =
- new ArrayList<>();
-
private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>>
- compactingRequests = new ArrayList<>();
+ compactingRequests = new LinkedList<>();
+
private SimpleVersionedListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
private Iterable<Map<Long, List<CompactorRequest>>> stateRemaining;
@@ -94,14 +77,25 @@ public class CompactorOperatorStateHandler
BucketWriter<?, String> bucketWriter) {
this.committableSerializer = committableSerializer;
this.bucketWriter = bucketWriter;
+ }
- this.fileCompactor = new IdenticalFileCompactor();
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+ // CompactService is not initialized yet, we can not submit requests here.
+ remainingRequestsState =
+ new SimpleVersionedListState<>(
+ context.getOperatorStateStore()
+ .getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
+ new RemainingRequestsSerializer(
+ new CompactorRequestSerializer(committableSerializer)));
+ stateRemaining = remainingRequestsState.get();
}
@Override
public void open() throws Exception {
super.open();
- this.compactService = new CompactService(1, fileCompactor, bucketWriter);
+ this.compactService = new CompactService(1, new IdenticalFileCompactor(), bucketWriter);
compactService.open();
if (stateRemaining != null) {
@@ -122,12 +116,14 @@ public class CompactorOperatorStateHandler
for (FileSinkCommittable toCompact : toCompactList) {
CompactorRequest compactRequest = new CompactorRequest(bucketId);
compactRequest.addToCompact(toCompact);
- submit(compactRequest);
+ compactingRequests.add(
+ new Tuple2<>(compactRequest, submit(compactRequest)));
}
CompactorRequest passThroughRequest = new CompactorRequest(bucketId);
toPassThrough.forEach(passThroughRequest::addToPassthrough);
- submit(passThroughRequest);
+ compactingRequests.add(
+ new Tuple2<>(passThroughRequest, submit(passThroughRequest)));
}
}
}
@@ -141,77 +137,137 @@ public class CompactorOperatorStateHandler
throws Exception {
Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> record =
element.getValue();
- if (stateDrained) {
+ if (state == State.PASSING_THROUGH_ALL) {
// all input should be committable messages to pass through
output.collect(new StreamRecord<>(record.left()));
return;
}
+ if (state == State.PASSING_THROUGH_COMMITTABLE) {
+ checkState(
+ record.isLeft(),
+ "Compacting requests is not expected once a normal committable is received.");
+ CommittableMessage<FileSinkCommittable> message = record.left();
+ if (message instanceof CommittableWithLineage) {
+ checkState(
+ !isHiddenCommittable((CommittableWithLineage<FileSinkCommittable>) message),
+ "Hidden committable is not expected once a normal committable is received.");
+ output.collect(new StreamRecord<>(message));
+ } else {
+ // Message is summary
+ if (compactingRequests.isEmpty()) {
+ // No compacting requests remained
+ state = State.PASSING_THROUGH_ALL;
+ output.collect(new StreamRecord<>(message));
+ return;
+ }
+ appendCompactingResultsToSummary((CommittableSummary<FileSinkCommittable>) message);
+ }
+ return;
+ }
+
if (record.isRight()) {
- submit(element.getValue().right());
+ CompactorRequest request = element.getValue().right();
+ compactingRequests.add(new Tuple2<>(request, submit(request)));
return;
}
CommittableMessage<FileSinkCommittable> message = record.left();
- if (message instanceof CommittableSummary) {
- checkState(holdingSummary == null, "Duplicate summary before the first checkpoint.");
- holdingSummary = (CommittableSummary<FileSinkCommittable>) message;
- holdingMessages = new ArrayList<>(holdingSummary.getNumberOfCommittables());
- } else {
- boolean compacting = false;
- CommittableWithLineage<FileSinkCommittable> committableWithLineage =
- (CommittableWithLineage<FileSinkCommittable>) message;
- if (committableWithLineage.getCommittable().hasPendingFile()) {
- FileSinkCommittable committable = committableWithLineage.getCommittable();
- PendingFileRecoverable pendingFile = committable.getPendingFile();
- if (pendingFile.getPath() != null
- && pendingFile.getPath().getName().startsWith(".")) {
- // The pending file is the in-progress file of the previous run, which
- // should be committed and compacted before sending to the committer.
- CompactorRequest request = new CompactorRequest(committable.getBucketId());
- request.addToCompact(committable);
- submit(request);
-
- compacting = true;
- compactingMessages.add(message);
- } else {
- // A normal file is received, indicating the writer state is drained.
- writerStateDrained = true;
- if (compactingMessages.isEmpty() && compactingRequests.isEmpty()) {
- // No state needs to be handled, the holding summary and all committable
- // messages can be sent eagerly
- checkState(holdingSummary != null);
- output.collect(new StreamRecord<>(holdingSummary));
- holdingSummary = null;
-
- this.stateDrained = true;
- output.collect(new StreamRecord<>(committableWithLineage));
- }
- }
+ if (message instanceof CommittableWithLineage) {
+ if (isHiddenCommittable((CommittableWithLineage<FileSinkCommittable>) message)) {
+ handleHiddenCommittable((CommittableWithLineage<FileSinkCommittable>) message);
+ } else {
+ // No more hidden committable is expected
+ state = State.PASSING_THROUGH_COMMITTABLE;
+ output.collect(new StreamRecord<>(message));
}
- if (!compacting && !stateDrained) {
- // Compacting messages should not be added
- // If the state is drained, no further messages need to be added
- holdingMessages.add(message);
+ } else {
+ if (compactingRequests.isEmpty()) {
+ output.collect(new StreamRecord<>(message));
+ return;
}
+ appendCompactingResultsToSummary((CommittableSummary<FileSinkCommittable>) message);
}
}
- @Override
- public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
- super.prepareSnapshotPreBarrier(checkpointId);
- if (stateDrained) {
- return;
+ private void appendCompactingResultsToSummary(CommittableSummary<FileSinkCommittable> summary)
+ throws ExecutionException, InterruptedException {
+ // To guarantee the order, we have to wait for all results here.
+ List<FileSinkCommittable> results = new ArrayList<>();
+ for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> t :
+ compactingRequests) {
+ t.f1.get().forEach(results::add);
+ }
+ compactingRequests.clear();
+
+ // Append the results to the summary and send them following it
+ output.collect(
+ new StreamRecord<>(
+ new CommittableSummary<>(
+ summary.getSubtaskId(),
+ summary.getNumberOfSubtasks(),
+ getCheckpointId(summary),
+ summary.getNumberOfCommittables() + results.size(),
+ summary.getNumberOfPendingCommittables() + results.size(),
+ summary.getNumberOfFailedCommittables())));
+ for (FileSinkCommittable committable : results) {
+ output.collect(
+ new StreamRecord<>(
+ new CommittableWithLineage<>(
+ committable,
+ getCheckpointId(summary),
+ summary.getSubtaskId())));
+ }
+ }
+
+ private boolean isHiddenCommittable(CommittableWithLineage<FileSinkCommittable> message) {
+ return message.getCommittable().hasPendingFile()
+ && message.getCommittable().getPendingFile().getPath() != null
+ && message.getCommittable().getPendingFile().getPath().getName().startsWith(".");
+ }
+
+ private void handleHiddenCommittable(CommittableWithLineage<FileSinkCommittable> message)
+ throws ExecutionException, InterruptedException {
+ FileSinkCommittable committable = message.getCommittable();
+
+ // The pending file is the in-progress file of the previous run, which
+ // should be committed and compacted before sending to the committer.
+ CompactorRequest request = new CompactorRequest(committable.getBucketId());
+ request.addToCompact(committable);
+
+ // Wait for the result synchronously, and pass though the result, but append
+ // cleanup request to the next summary, since the count of pending committable
+ // for this checkpoint is immutable now
+ Iterable<FileSinkCommittable> result = submit(request).get();
+ Long checkpointId = getCheckpointId(message);
+ boolean pendingFileSent = false;
+ for (FileSinkCommittable c : result) {
+ if (c.hasPendingFile()) {
+ checkState(
+ !pendingFileSent,
+ "A in-progress file should not be converted to multiple pending files");
+ pendingFileSent = true;
+ output.collect(
+ new StreamRecord<>(
+ new CommittableWithLineage<>(
+ c, checkpointId, message.getSubtaskId())));
+ } else {
+ // Wrap cleanup request as pass through request and reserved in the
+ // compacting requests.
+ // These requests will be appended to the next summary, if there is.
+ CompactorRequest passThroughRequest = new CompactorRequest(c.getBucketId());
+ passThroughRequest.addToPassthrough(c);
+ compactingRequests.add(
+ new Tuple2<>(passThroughRequest, submit(passThroughRequest)));
+ }
}
- drain();
- // The operator is stateless once drain is called. snapshotState is not necessary.
}
@Override
public void endInput() throws Exception {
- if (!stateDrained) {
- drain();
- }
+ // Although there may be remaining cleanup requests in compactingRequests, there is no way
+ // to let Committer accepts them since the eoi summary has been sent.
+ // For now we can do nothing but leave them there.
}
@Override
@@ -222,102 +278,74 @@ public class CompactorOperatorStateHandler
}
}
- private void submit(CompactorRequest request) {
- CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>();
- compactService.submit(request, resultFuture);
- compactingRequests.add(new Tuple2<>(request, resultFuture));
- }
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ if (compactingRequests.isEmpty()) {
+ return;
+ }
- private void drain() throws ExecutionException, InterruptedException {
- checkState(holdingSummary != null);
- checkState(
- holdingSummary.getNumberOfPendingCommittables()
- == holdingSummary.getNumberOfCommittables()
- && holdingSummary.getNumberOfCommittables()
- == holdingMessages.size() + compactingMessages.size());
-
- Long checkpointId =
- holdingSummary.getCheckpointId().isPresent()
- ? holdingSummary.getCheckpointId().getAsLong()
- : null;
- int subtaskId = holdingSummary.getSubtaskId();
-
- if (!compactingRequests.isEmpty()) {
- CompletableFuture.allOf(
- compactingRequests.stream()
- .map(r -> r.f1)
- .toArray(CompletableFuture[]::new))
- .join();
-
- for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>
- compacting : compactingRequests) {
- CompletableFuture<Iterable<FileSinkCommittable>> future = compacting.f1;
- checkState(future.isDone());
- // Exception is thrown if it's completed exceptionally
- for (FileSinkCommittable c : future.get()) {
- holdingMessages.add(new CommittableWithLineage<>(c, checkpointId, subtaskId));
+ // Results of some requests are not drained by a summary. They should be reserved in the
+ // state and wait for the next summary.
+
+ List<CompactorRequest> remainingRequests = new ArrayList<>();
+ for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> t :
+ compactingRequests) {
+ if (t.f1.isDone()) {
+ // We can add the results as a pass-through request if the compaction is done
+ Iterable<FileSinkCommittable> result = t.f1.get();
+ if (result.iterator().hasNext()) {
+ String bucketId = result.iterator().next().getBucketId();
+ CompactorRequest passThroughRequest = new CompactorRequest(bucketId);
+ result.forEach(passThroughRequest::addToPassthrough);
+ remainingRequests.add(passThroughRequest);
}
+ } else {
+ // Or we add the original request in the state
+ remainingRequests.add(t.f0);
}
}
-
- // Appending the compacted committable to the holding summary
- CommittableSummary<FileSinkCommittable> summary =
- new CommittableSummary<>(
- holdingSummary.getSubtaskId(),
- holdingSummary.getNumberOfSubtasks(),
- holdingSummary.getCheckpointId().isPresent()
- ? holdingSummary.getCheckpointId().getAsLong()
- : null,
- holdingMessages.size(),
- holdingMessages.size(),
- holdingSummary.getNumberOfFailedCommittables());
- output.collect(new StreamRecord<>(summary));
- for (CommittableMessage<FileSinkCommittable> committable : holdingMessages) {
- output.collect(new StreamRecord<>(committable));
- }
-
- // Remaining requests should be all done and their results are all emitted.
- // From now on the operator is stateless.
- remainingRequestsState.clear();
-
- compactingRequests.clear();
- compactingMessages.clear();
- holdingSummary = null;
- holdingMessages = null;
-
- if (writerStateDrained) {
- // We can pass through everything if the writer state is also drained.
- stateDrained = true;
- compactService.close();
- compactService = null;
- }
+ Map<Long, List<CompactorRequest>> requestsMap = new HashMap<>();
+ requestsMap.put(-1L, remainingRequests);
+ remainingRequestsState.update(Collections.singletonList(requestsMap));
}
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- super.initializeState(context);
-
- remainingRequestsState =
- new SimpleVersionedListState<>(
- context.getOperatorStateStore()
- .getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
- new RemainingRequestsSerializer(
- new CompactorRequestSerializer(committableSerializer)));
-
- stateRemaining = remainingRequestsState.get();
+ private Long getCheckpointId(CommittableMessage<FileSinkCommittable> message) {
+ return message.getCheckpointId().isPresent() ? message.getCheckpointId().getAsLong() : null;
+ }
- // stateDrained can not be determined here, since even if the stateRemaining is empty,
- // there may still be some requests from the coordinator and a in-progress file in the file
- // writer
+ private CompletableFuture<Iterable<FileSinkCommittable>> submit(CompactorRequest request) {
+ CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>();
+ compactService.submit(request, resultFuture);
+ return resultFuture;
}
@VisibleForTesting
- public boolean isWriterStateDrained() {
- return writerStateDrained;
+ public boolean isPassingThroughCommittable() {
+ return state == State.PASSING_THROUGH_COMMITTABLE;
}
@VisibleForTesting
- public boolean isStateDrained() {
- return stateDrained;
+ public boolean isPassingThroughAll() {
+ return state == State.PASSING_THROUGH_ALL;
+ }
+
+ /** The handling state of this operator. */
+ private enum State {
+ /**
+ * Handling states of the previous run, including compaction requests from the coordinator
+ * and the hidden committable from the writer.
+ */
+ HANDLING_STATE,
+ /**
+ * All states of the previous run are handled, while some results need to be appended to the
+ * next summary. All committable can be passed through now.
+ */
+ PASSING_THROUGH_COMMITTABLE,
+ /**
+ * All states of the previous run are handled and all results are sent. The job of this
+ * operator is done and everything can be passed through directly.
+ */
+ PASSING_THROUGH_ALL
}
}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java
index 6b7f397..09af83e 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java
@@ -308,28 +308,26 @@ public class CompactorOperatorTest extends AbstractCompactTestBase {
new CommittableWithLineage<>(
committable("0", "7", 8), 3L, 0))));
- Assert.assertTrue(handler.isWriterStateDrained());
- Assert.assertFalse(handler.isStateDrained());
+ Assert.assertTrue(handler.isPassingThroughCommittable());
+ Assert.assertFalse(handler.isPassingThroughAll());
- // the result should not be emitted yet, but all requests should already be submitted
- Assert.assertEquals(0, harness.extractOutputValues().size());
+ harness.processElement(
+ new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 4L, 0, 0, 0))));
- compactor.getAllTasksFuture().join();
- // state should be drained, and all results and holding messages should be emitted
- harness.prepareSnapshotPreBarrier(3);
+ harness.processElement(
+ new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 5L, 3, 3, 0))));
- Assert.assertTrue(handler.isStateDrained());
+ Assert.assertTrue(handler.isPassingThroughAll());
- // summary should be merged into one
- // 1 summary+ 1 compacted + (1 compacted committable + 1 compacted cleanup) * 7
+ // 1 summary + (1 compacted committable + 1 compacted cleanup) * 6 + 1 hidden + 1 normal
+ // + 1 summary + 1 cleanup + 1 summary
List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
- Assert.assertEquals(16, results.size());
+ Assert.assertEquals(18, results.size());
SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
- .hasPendingCommittables(15);
+ .hasPendingCommittables(14);
List<FileSinkCommittable> expectedResult =
Arrays.asList(
- committable("0", "7", 8),
committable("0", "compacted-0", 1),
cleanupPath("0", ".0"),
committable("0", "compacted-1", 2),
@@ -343,12 +341,120 @@ public class CompactorOperatorTest extends AbstractCompactTestBase {
committable("0", "compacted-5", 6),
cleanupPath("0", ".5"),
committable("0", "compacted-6", 7),
- cleanupPath("0", ".6"));
+ committable("0", "7", 8));
- for (int i = 1; i < results.size(); ++i) {
- SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(i))
- .hasCommittable(expectedResult.get(i - 1));
+ for (int i = 0; i < expectedResult.size(); ++i) {
+ SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(i + 1))
+ .hasCommittable(expectedResult.get(i));
}
+
+ SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(15))
+ .hasPendingCommittables(1);
+ SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(16))
+ .hasCommittable(cleanupPath("0", ".6"));
+
+ SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(17))
+ .hasPendingCommittables(3);
+ }
+ }
+
+ @Test
+ public void testStateHandlerRestore() throws Exception {
+ OperatorSubtaskState state;
+ try (OneInputStreamOperatorTestHarness<
+ Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+ CommittableMessage<FileSinkCommittable>>
+ harness =
+ new OneInputStreamOperatorTestHarness<>(
+ new CompactorOperatorStateHandler(
+ getTestCommittableSerializer(),
+ createTestBucketWriter()))) {
+ harness.setup();
+ harness.open();
+
+ // remaining request from coordinator
+ harness.processElement(
+ new StreamRecord<>(
+ Either.Right(
+ request(
+ "0",
+ Collections.singletonList(
+ committable("0", ".1", 1)),
+ null)
+ .getValue())));
+
+ // process only summary during cp1, unaligned barrier may be processed ahead of the
+ // elements
+ harness.processElement(
+ new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 1L, 2, 2, 0))));
+
+ state = harness.snapshot(1, 1L);
+
+ List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+ Assert.assertEquals(3, results.size());
+ SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
+ .hasPendingCommittables(4);
+ SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1))
+ .hasCommittable(committable("0", "compacted-1", 1));
+ SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(2))
+ .hasCommittable(cleanupPath("0", ".1"));
+ }
+
+ try (OneInputStreamOperatorTestHarness<
+ Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+ CommittableMessage<FileSinkCommittable>>
+ harness =
+ new OneInputStreamOperatorTestHarness<>(
+ new CompactorOperatorStateHandler(
+ getTestCommittableSerializer(),
+ createTestBucketWriter()))) {
+ harness.setup();
+ harness.initializeState(state);
+ harness.open();
+
+ harness.processElement(
+ new StreamRecord<>(
+ Either.Left(
+ new CommittableWithLineage<>(
+ committable("0", ".2", 2), 1L, 0))));
+
+ harness.processElement(
+ new StreamRecord<>(
+ Either.Left(
+ new CommittableWithLineage<>(
+ committable("0", "3", 3), 1L, 0))));
+
+ state = harness.snapshot(2, 2L);
+
+ List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+ Assert.assertEquals(2, results.size());
+ SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(0))
+ .hasCommittable(committable("0", "2", 2));
+ SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1))
+ .hasCommittable(committable("0", "3", 3));
+ }
+
+ try (OneInputStreamOperatorTestHarness<
+ Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>,
+ CommittableMessage<FileSinkCommittable>>
+ harness =
+ new OneInputStreamOperatorTestHarness<>(
+ new CompactorOperatorStateHandler(
+ getTestCommittableSerializer(),
+ createTestBucketWriter()))) {
+ harness.setup();
+ harness.initializeState(state);
+ harness.open();
+
+ harness.processElement(
+ new StreamRecord<>(Either.Left(new CommittableSummary<>(0, 1, 2L, 0, 0, 0))));
+
+ List<CommittableMessage<FileSinkCommittable>> results = harness.extractOutputValues();
+ Assert.assertEquals(2, results.size());
+ SinkV2Assertions.assertThat((CommittableSummary<?>) results.get(0))
+ .hasPendingCommittables(1);
+ SinkV2Assertions.assertThat((CommittableWithLineage<?>) results.get(1))
+ .hasCommittable(cleanupPath("0", ".2"));
}
}