You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/03/03 03:41:33 UTC

[flink] branch master updated (adc5849 -> 2c9d64e)

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from adc5849  Revert "[FLINK-25659][table-planner] Enable flaky LogicalRelDataTypeConverterTest with debug information"
     new 64367a2  Revert "[FLINK-26314][connectors/filesystem] Disable unaligned checkpoints for StreamingExecutionFileSinkITCase."
     new 2c9d64e  [FLINK-26403][datastream] Fixes the endOfInput logic of sink writer and committer.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sink/StreamingCompactingFileSinkITCase.java    |  8 ---
 .../sink/StreamingExecutionFileSinkITCase.java     |  3 --
 .../runtime/operators/sink/CommitterOperator.java  | 57 +++++++++++-----------
 .../operators/sink/CommitterOperatorFactory.java   | 13 +++--
 .../runtime/operators/sink/SinkWriterOperator.java | 41 +++-------------
 .../operators/sink/SinkWriterOperatorFactory.java  | 14 +-----
 .../translators/SinkTransformationTranslator.java  |  9 ++--
 .../operators/sink/CommitterOperatorTest.java      | 32 ++++++++----
 .../operators/sink/SinkWriterOperatorTest.java     | 37 ++++----------
 9 files changed, 82 insertions(+), 132 deletions(-)

[flink] 02/02: [FLINK-26403][datastream] Fixes the endOfInput logic of sink writer and committer.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c9d64e34cfd3025c87c1d3bbd2d1b596df11691
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Mar 1 17:48:16 2022 +0800

    [FLINK-26403][datastream] Fixes the endOfInput logic of sink writer and committer.
    
    SinkWriterOperator should emit all the pending committables on endOfInput,
    and CommitterOperator should commit all committables when the final checkpoint
    is completed or on endOfInput if there's no final checkpoint.
    
    This closes #18938.
---
 .../runtime/operators/sink/CommitterOperator.java  | 57 +++++++++++-----------
 .../operators/sink/CommitterOperatorFactory.java   | 13 +++--
 .../runtime/operators/sink/SinkWriterOperator.java | 41 +++-------------
 .../operators/sink/SinkWriterOperatorFactory.java  | 14 +-----
 .../translators/SinkTransformationTranslator.java  |  9 ++--
 .../operators/sink/CommitterOperatorTest.java      | 32 ++++++++----
 .../operators/sink/SinkWriterOperatorTest.java     | 37 ++++----------
 7 files changed, 82 insertions(+), 121 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 9cb5a4a..ed74daa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -64,12 +64,12 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage
     private final SimpleVersionedSerializer<CommT> committableSerializer;
     private final Committer<CommT> committer;
     private final boolean emitDownstream;
-    private final boolean isCheckpointingOrBatchModeEnabled;
+    private final boolean isBatchMode;
+    private final boolean isCheckpointingEnabled;
     private CommittableCollector<CommT> committableCollector;
     private long lastCompletedCheckpointId = -1;
 
     private boolean endInput = false;
-    private boolean finalEmission = false;
 
     /** The operator's state descriptor. */
     private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
@@ -84,9 +84,11 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage
             SimpleVersionedSerializer<CommT> committableSerializer,
             Committer<CommT> committer,
             boolean emitDownstream,
-            boolean isCheckpointingOrBatchModeEnabled) {
+            boolean isBatchMode,
+            boolean isCheckpointingEnabled) {
         this.emitDownstream = emitDownstream;
-        this.isCheckpointingOrBatchModeEnabled = isCheckpointingOrBatchModeEnabled;
+        this.isBatchMode = isBatchMode;
+        this.isCheckpointingEnabled = isCheckpointingEnabled;
         this.processingTimeService = checkNotNull(processingTimeService);
         this.committableSerializer = checkNotNull(committableSerializer);
         this.committer = checkNotNull(committer);
@@ -130,43 +132,40 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage
     @Override
     public void endInput() throws Exception {
         endInput = true;
-        final CommittableManager<CommT> endOfInputCommittable =
-                committableCollector.getEndOfInputCommittable();
-        // indicates batch
-        if (endOfInputCommittable != null) {
-            do {
-                commitAndEmit(endOfInputCommittable, false);
-            } while (!committableCollector.isFinished());
-        }
-        if (!isCheckpointingOrBatchModeEnabled) {
-            notifyCheckpointComplete(
-                    lastCompletedCheckpointId == -1 ? 1 : lastCompletedCheckpointId + 1);
+        if (!isCheckpointingEnabled || isBatchMode) {
+            // There will be no final checkpoint, all committables should be committed here
+            notifyCheckpointComplete(Long.MAX_VALUE);
         }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        // If a streaming job finishes and a savepoint is triggered afterwards we do not want to
-        // flush again
-        if (finalEmission) {
-            return;
-        }
+        super.notifyCheckpointComplete(checkpointId);
         if (endInput) {
-            finalEmission = true;
+            // This is the final checkpoint, all committables should be committed
+            lastCompletedCheckpointId = Long.MAX_VALUE;
+        } else {
+            lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
         }
-        super.notifyCheckpointComplete(checkpointId);
-        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
         commitAndEmitCheckpoints();
     }
 
     private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
-        for (CheckpointCommittableManager<CommT> manager :
-                committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
-            // wait for all committables of the current manager before submission
-            boolean fullyReceived = manager.getCheckpointId() == lastCompletedCheckpointId;
-            commitAndEmit(manager, fullyReceived);
-        }
+        do {
+            for (CheckpointCommittableManager<CommT> manager :
+                    committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
+                // wait for all committables of the current manager before submission
+                boolean fullyReceived =
+                        !endInput && manager.getCheckpointId() == lastCompletedCheckpointId;
+                commitAndEmit(manager, fullyReceived);
+            }
+            // !committableCollector.isFinished() indicates that we should retry
+            // Retry should be done here if this is a final checkpoint (indicated by endInput)
+            // WARN: this is an endless retry, may make the job stuck while finishing
+        } while (!committableCollector.isFinished() && endInput);
+
         if (!committableCollector.isFinished()) {
+            // if not endInput, we can schedule retrying later
             retryWithDelay();
         }
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
index f2327c9..8f2056f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
@@ -42,12 +42,16 @@ public final class CommitterOperatorFactory<CommT>
                 CommittableMessage<CommT>, CommittableMessage<CommT>> {
 
     private final TwoPhaseCommittingSink<?, CommT> sink;
-    private final boolean isCheckpointingOrBatchModeEnabled;
+    private final boolean isBatchMode;
+    private final boolean isCheckpointingEnabled;
 
     public CommitterOperatorFactory(
-            TwoPhaseCommittingSink<?, CommT> sink, boolean isCheckpointingOrBatchModeEnabled) {
+            TwoPhaseCommittingSink<?, CommT> sink,
+            boolean isBatchMode,
+            boolean isCheckpointingEnabled) {
         this.sink = checkNotNull(sink);
-        this.isCheckpointingOrBatchModeEnabled = isCheckpointingOrBatchModeEnabled;
+        this.isBatchMode = isBatchMode;
+        this.isCheckpointingEnabled = isCheckpointingEnabled;
     }
 
     @Override
@@ -62,7 +66,8 @@ public final class CommitterOperatorFactory<CommT>
                             sink.getCommittableSerializer(),
                             sink.createCommitter(),
                             sink instanceof WithPostCommitTopology,
-                            isCheckpointingOrBatchModeEnabled);
+                            isBatchMode,
+                            isCheckpointingEnabled);
             committerOperator.setup(
                     parameters.getContainingTask(),
                     parameters.getStreamConfig(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index edef318..564f0aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -70,8 +70,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
     private final Context<InputT> context;
 
     private final boolean emitDownstream;
-    private final boolean isBatchMode;
-    private final boolean isCheckpointingEnabled;
 
     // ------------------------------- runtime fields ---------------------------------------
 
@@ -80,23 +78,16 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
 
     private SinkWriter<InputT> sinkWriter;
 
-    private OptionalLong restoredCheckpointId = OptionalLong.empty();
-
     private final SinkWriterStateHandler<InputT> writerStateHandler;
 
     private final MailboxExecutor mailboxExecutor;
 
     private boolean endOfInput = false;
-    private boolean finalEmission = false;
 
     SinkWriterOperator(
             Sink<InputT> sink,
             ProcessingTimeService processingTimeService,
-            MailboxExecutor mailboxExecutor,
-            boolean isBatchMode,
-            boolean isCheckpointingEnabled) {
-        this.isBatchMode = isBatchMode;
-        this.isCheckpointingEnabled = isCheckpointingEnabled;
+            MailboxExecutor mailboxExecutor) {
         this.processingTimeService = checkNotNull(processingTimeService);
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
         this.context = new Context<>();
@@ -114,7 +105,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
         OptionalLong checkpointId = context.getRestoredCheckpointId();
-        restoredCheckpointId = checkpointId;
         InitContext initContext =
                 createInitContext(checkpointId.isPresent() ? checkpointId.getAsLong() : null);
 
@@ -135,17 +125,12 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
 
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
-        // If a streaming job finishes and a savepoint is triggered afterwards we do not want to
-        // flush again
-        if (finalEmission) {
-            return;
-        }
-        if (endOfInput) {
-            finalEmission = true;
-        }
         super.prepareSnapshotPreBarrier(checkpointId);
-        sinkWriter.flush(endOfInput);
-        emitCommittables(checkpointId);
+        if (!endOfInput) {
+            sinkWriter.flush(false);
+            emitCommittables(checkpointId);
+        }
+        // no records are expected to emit after endOfInput
     }
 
     @Override
@@ -159,18 +144,8 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
     @Override
     public void endInput() throws Exception {
         endOfInput = true;
-        // Only in batch mode we want to emit with the Long.MAX_VALUE checkpoint id. In streaming
-        // mode there will be a final checkpoint after endInput that flushes all pending
-        // committables.
-        if (isBatchMode) {
-            sinkWriter.flush(true);
-            emitCommittables(Long.MAX_VALUE);
-            return;
-        }
-        // There will be no final checkpoint but the job runs in streaming mode, so we try to commit
-        if (!isCheckpointingEnabled) {
-            prepareSnapshotPreBarrier(restoredCheckpointId.orElse(0) + 1);
-        }
+        sinkWriter.flush(true);
+        emitCommittables(Long.MAX_VALUE);
     }
 
     private void emitCommittables(Long checkpointId) throws IOException, InterruptedException {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
index f36880d..4ad16c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
@@ -45,26 +45,16 @@ public final class SinkWriterOperatorFactory<InputT, CommT>
                 YieldingOperatorFactory<CommittableMessage<CommT>> {
 
     private final Sink<InputT> sink;
-    private final boolean isBatchMode;
-    private final boolean isCheckpointingEnabled;
 
-    public SinkWriterOperatorFactory(
-            Sink<InputT> sink, boolean isBatchMode, boolean isCheckpointingEnabled) {
+    public SinkWriterOperatorFactory(Sink<InputT> sink) {
         this.sink = checkNotNull(sink);
-        this.isBatchMode = isBatchMode;
-        this.isCheckpointingEnabled = isCheckpointingEnabled;
     }
 
     public <T extends StreamOperator<CommittableMessage<CommT>>> T createStreamOperator(
             StreamOperatorParameters<CommittableMessage<CommT>> parameters) {
         try {
             final SinkWriterOperator<InputT, CommT> writerOperator =
-                    new SinkWriterOperator<>(
-                            sink,
-                            processingTimeService,
-                            getMailboxExecutor(),
-                            isBatchMode,
-                            isCheckpointingEnabled);
+                    new SinkWriterOperator<>(sink, processingTimeService, getMailboxExecutor());
             writerOperator.setup(
                     parameters.getContainingTask(),
                     parameters.getStreamConfig(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 30d5316..09240c0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -140,8 +140,7 @@ public class SinkTransformationTranslator<Input, Output>
                                 input.transform(
                                         WRITER_NAME,
                                         CommittableMessageTypeInfo.noOutput(),
-                                        new SinkWriterOperatorFactory<>(
-                                                sink, isBatchMode, isCheckpointingEnabled)));
+                                        new SinkWriterOperatorFactory<>(sink)));
             }
 
             final List<Transformation<?>> sinkTransformations =
@@ -172,8 +171,7 @@ public class SinkTransformationTranslator<Input, Output>
                                     input.transform(
                                             WRITER_NAME,
                                             typeInformation,
-                                            new SinkWriterOperatorFactory<>(
-                                                    sink, isBatchMode, isCheckpointingEnabled)));
+                                            new SinkWriterOperatorFactory<>(sink)));
 
             DataStream<CommittableMessage<CommT>> precommitted = addFailOverRegion(written);
 
@@ -193,7 +191,8 @@ public class SinkTransformationTranslator<Input, Output>
                                             typeInformation,
                                             new CommitterOperatorFactory<>(
                                                     committingSink,
-                                                    isBatchMode || isCheckpointingEnabled)));
+                                                    isBatchMode,
+                                                    isCheckpointingEnabled)));
 
             if (sink instanceof WithPostCommitTopology) {
                 DataStream<CommittableMessage<CommT>> postcommitted = addFailOverRegion(committed);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
index 06b44dc..204d664 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
@@ -72,7 +72,7 @@ class CommitterOperatorTest {
                 testHarness =
                         new OneInputStreamOperatorTestHarness<>(
                                 new CommitterOperatorFactory<>(
-                                        (TwoPhaseCommittingSink<?, String>) sink, true));
+                                        (TwoPhaseCommittingSink<?, String>) sink, false, true));
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -105,7 +105,7 @@ class CommitterOperatorTest {
         final ForwardingCommitter committer = new ForwardingCommitter();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
+                testHarness = createTestHarness(committer, false, true);
         testHarness.open();
         testHarness.setProcessingTime(0);
 
@@ -146,7 +146,7 @@ class CommitterOperatorTest {
         final ForwardingCommitter committer = new ForwardingCommitter();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
+                testHarness = createTestHarness(committer, false, true);
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -176,12 +176,13 @@ class CommitterOperatorTest {
         testHarness.close();
     }
 
-    @Test
-    void testEmitAllCommittablesOnEndOfInput() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception {
         final ForwardingCommitter committer = new ForwardingCommitter();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
+                testHarness = createTestHarness(committer, isBatchMode, !isBatchMode);
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -197,6 +198,11 @@ class CommitterOperatorTest {
         testHarness.processElement(new StreamRecord<>(second));
 
         testHarness.endInput();
+        if (!isBatchMode) {
+            assertThat(testHarness.getOutput()).hasSize(0);
+            // notify final checkpoint complete
+            testHarness.notifyOfCompletedCheckpoint(1);
+        }
 
         final List<StreamElement> output = fromOutput(testHarness.getOutput());
         assertThat(output).hasSize(3);
@@ -215,7 +221,7 @@ class CommitterOperatorTest {
     void testStateRestore() throws Exception {
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(new TestSink.RetryOnceCommitter());
+                testHarness = createTestHarness(new TestSink.RetryOnceCommitter(), false, true);
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -235,7 +241,7 @@ class CommitterOperatorTest {
         final ForwardingCommitter committer = new ForwardingCommitter();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                restored = createTestHarness(committer);
+                restored = createTestHarness(committer, false, true);
 
         restored.initializeState(snapshot);
         restored.open();
@@ -271,6 +277,7 @@ class CommitterOperatorTest {
                         new OneInputStreamOperatorTestHarness<>(
                                 new CommitterOperatorFactory<>(
                                         (TwoPhaseCommittingSink<?, String>) sink,
+                                        false,
                                         isCheckpointingEnabled));
         testHarness.open();
 
@@ -318,7 +325,11 @@ class CommitterOperatorTest {
 
     private OneInputStreamOperatorTestHarness<
                     CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness(Committer<String> committer) throws Exception {
+            createTestHarness(
+                    Committer<String> committer,
+                    boolean isBatchMode,
+                    boolean isCheckpointingEnabled)
+                    throws Exception {
         return new OneInputStreamOperatorTestHarness<>(
                 new CommitterOperatorFactory<>(
                         (TwoPhaseCommittingSink<?, String>)
@@ -329,7 +340,8 @@ class CommitterOperatorTest {
                                                 TestSink.StringCommittableSerializer.INSTANCE)
                                         .build()
                                         .asV2(),
-                        true));
+                        isBatchMode,
+                        isCheckpointingEnabled));
     }
 
     private static class ForwardingCommitter extends TestSink.DefaultCommitter {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
index 3a7b8a9..d71becf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
@@ -57,18 +57,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class SinkWriterOperatorTest {
 
-    private static final boolean STREAMING_MODE = false;
-    private static final boolean CHECKPOINTING_ENABLED = true;
-
     @Test
     void testNotEmitCommittablesWithoutCommitter() throws Exception {
         final TestSink.DefaultSinkWriter<Integer> sinkWriter = new TestSink.DefaultSinkWriter<>();
         final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new SinkWriterOperatorFactory<>(
-                                TestSink.newBuilder().setWriter(sinkWriter).build().asV2(),
-                                STREAMING_MODE,
-                                CHECKPOINTING_ENABLED));
+                                TestSink.newBuilder().setWriter(sinkWriter).build().asV2()));
         testHarness.open();
         testHarness.processElement(1, 1);
 
@@ -90,9 +85,7 @@ class SinkWriterOperatorTest {
         final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new SinkWriterOperatorFactory<>(
-                                TestSink.newBuilder().setWriter(writer).build().asV2(),
-                                STREAMING_MODE,
-                                CHECKPOINTING_ENABLED));
+                                TestSink.newBuilder().setWriter(writer).build().asV2()));
         testHarness.open();
 
         testHarness.processWatermark(initialTime);
@@ -118,9 +111,7 @@ class SinkWriterOperatorTest {
                                         .setDefaultCommitter()
                                         .setWriter(new TimeBasedBufferingSinkWriter())
                                         .build()
-                                        .asV2(),
-                                STREAMING_MODE,
-                                CHECKPOINTING_ENABLED));
+                                        .asV2()));
 
         testHarness.open();
 
@@ -148,9 +139,7 @@ class SinkWriterOperatorTest {
         final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new SinkWriterOperatorFactory<>(
-                                TestSink.newBuilder().setDefaultCommitter().build().asV2(),
-                                STREAMING_MODE,
-                                CHECKPOINTING_ENABLED));
+                                TestSink.newBuilder().setDefaultCommitter().build().asV2()));
 
         testHarness.open();
         assertThat(testHarness.getOutput()).isEmpty();
@@ -169,7 +158,7 @@ class SinkWriterOperatorTest {
     void testEmitOnEndOfInputInBatchMode() throws Exception {
         final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory =
                 new SinkWriterOperatorFactory<>(
-                        TestSink.newBuilder().setDefaultCommitter().build().asV2(), true, false);
+                        TestSink.newBuilder().setDefaultCommitter().build().asV2());
         final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
 
@@ -223,7 +212,7 @@ class SinkWriterOperatorTest {
         restoredTestHarness.prepareSnapshotPreBarrier(checkpointId);
 
         if (stateful) {
-            assertBasicOutput(restoredTestHarness.getOutput(), 2, checkpointId);
+            assertBasicOutput(restoredTestHarness.getOutput(), 2, Long.MAX_VALUE);
         } else {
             assertThat(fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue())
                     .isInstanceOf(CommittableSummary.class)
@@ -313,9 +302,7 @@ class SinkWriterOperatorTest {
                                         .setWriter(sinkWriter)
                                         .setDefaultCommitter()
                                         .build()
-                                        .asV2(),
-                                STREAMING_MODE,
-                                isCheckpointingEnabled));
+                                        .asV2()));
         testHarness.open();
         testHarness.processElement(1, 1);
 
@@ -329,10 +316,6 @@ class SinkWriterOperatorTest {
             testHarness.prepareSnapshotPreBarrier(1);
         }
 
-        // Ensure after the final emission no emission is possible anymore to prevent empty updates
-        testHarness.prepareSnapshotPreBarrier(2);
-        testHarness.endInput();
-
         assertEmitted(Collections.singletonList(record), testHarness.getOutput());
         assertThat(sinkWriter.elements).isEmpty();
 
@@ -373,8 +356,7 @@ class SinkWriterOperatorTest {
             builder.withWriterState();
         }
         final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(
-                        builder.build().asV2(), STREAMING_MODE, CHECKPOINTING_ENABLED);
+                new SinkWriterOperatorFactory<>(builder.build().asV2());
         return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
     }
 
@@ -391,8 +373,7 @@ class SinkWriterOperatorTest {
             builder.withWriterState();
         }
         final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(
-                        builder.build().asV2(), STREAMING_MODE, CHECKPOINTING_ENABLED);
+                new SinkWriterOperatorFactory<>(builder.build().asV2());
         return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
     }
 

[flink] 01/02: Revert "[FLINK-26314][connectors/filesystem] Disable unaligned checkpoints for StreamingExecutionFileSinkITCase."

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 64367a2f134ba9f174986845479d3fc0560f756f
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Mar 1 17:32:33 2022 +0800

    Revert "[FLINK-26314][connectors/filesystem] Disable unaligned checkpoints for StreamingExecutionFileSinkITCase."
    
    This reverts commit 69276df0
---
 .../connector/file/sink/StreamingCompactingFileSinkITCase.java    | 8 --------
 .../connector/file/sink/StreamingExecutionFileSinkITCase.java     | 3 ---
 2 files changed, 11 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
index de701d1..227d49a 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.I
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.junit.Rule;
@@ -60,13 +59,6 @@ public class StreamingCompactingFileSinkITCase extends StreamingExecutionFileSin
                 .build();
     }
 
-    @Override
-    protected void configureEnvironment(StreamExecutionEnvironment env) {
-        super.configureEnvironment(env);
-        // Disable unaligned checkpoints explicitly to avoid being randomly enabled
-        env.getCheckpointConfig().enableUnalignedCheckpoints(false);
-    }
-
     private static FileCompactor createFileCompactor() {
         return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
     }
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
index 6209a46..c9d8f9f 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
@@ -82,7 +82,6 @@ public class StreamingExecutionFileSinkITCase extends FileSinkITBase {
         env.configure(config, getClass().getClassLoader());
 
         env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE);
-        configureEnvironment(env);
 
         if (triggerFailover) {
             env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
@@ -99,8 +98,6 @@ public class StreamingExecutionFileSinkITCase extends FileSinkITBase {
         return streamGraph.getJobGraph();
     }
 
-    protected void configureEnvironment(StreamExecutionEnvironment env) {}
-
     // ------------------------ Streaming mode user functions ----------------------------------
 
     private static class StreamingExecutionTestSource extends RichParallelSourceFunction<Integer>