You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/03 19:55:08 UTC

[flink] branch master updated (affbd59 -> 1c19ab7)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from affbd59  [FLINK-19360][scripts] Support spaces in '${JAVA_HOME}'
     new 0b45163  [FLINK-21132][runtime] Don't end input on stop with savepoint
     new 1dda3c8  [FLINK-21132][runtime][tests] Stop with savepoint shouldn't end input
     new 5ea6ec4  [FLINK-21132][runtime][tests] Parameterize StopWithSavepoint test with chaining strategy
     new 61a745b  [FLINK-21132][runtime][tests] Test StopWith Savepoint against concurrent EndOfInput
     new 1c19ab7  [hotfix][task] Rename SourceStreamTask.isFinished to wasStoppedExternally

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streaming/runtime/tasks/OperatorChain.java     |  10 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |  15 +-
 .../runtime/tasks/StreamOperatorWrapper.java       |  11 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  25 ++-
 .../runtime/tasks/StreamOperatorWrapperTest.java   |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  97 ++++++++-
 .../tasks/TestBoundedOneInputStreamOperator.java   |  11 +
 .../flink/test/checkpointing/SavepointITCase.java  | 234 ++++++++++++++++++++-
 8 files changed, 383 insertions(+), 24 deletions(-)


[flink] 01/05: [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b45163645dde0c14dc40a2a5d2df9426239fd5b
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Jan 29 16:46:15 2021 +0100

    [FLINK-21132][runtime] Don't end input on stop with savepoint
    
    EndOfInput was used to handle any stopping of the job. When
    stopping with savepoint the input is not actually ended.
    This causes issues with some sinks (e.g. Iceberg).
    
    With this change, endInput is not call for stop-with-savepoint.
    
    To differentiate stop-with-savepoint from other cases
    only checkpoint (RPC/barriers) are considered and not network EOP.
    That's enough because EOP is only injected after the CP completion
    (i.e. when the downstream is also notified by sync savepoint by CP
    barriers).
---
 .../streaming/runtime/tasks/OperatorChain.java     | 10 ++-
 .../streaming/runtime/tasks/SourceStreamTask.java  |  9 ++
 .../runtime/tasks/StreamOperatorWrapper.java       | 11 +--
 .../flink/streaming/runtime/tasks/StreamTask.java  | 25 ++++--
 .../runtime/tasks/StreamOperatorWrapperTest.java   |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 97 +++++++++++++++++++++-
 .../tasks/TestBoundedOneInputStreamOperator.java   | 11 +++
 7 files changed, 150 insertions(+), 17 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 68514e8..8c59f75 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -120,6 +120,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
 
     private final OperatorEventDispatcherImpl operatorEventDispatcher;
 
+    private boolean isStoppingBySyncSavepoint;
+
     /**
      * Current status of the input stream of the operator chain. Watermarks explicitly generated by
      * operators in the chain (i.e. timestamp assigner / watermark extractors), will be blocked and
@@ -408,7 +410,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
      */
     @Override
     public void endInput(int inputId) throws Exception {
-        if (mainOperatorWrapper != null) {
+        if (mainOperatorWrapper != null && !isStoppingBySyncSavepoint) {
             mainOperatorWrapper.endOperatorInput(inputId);
         }
     }
@@ -434,7 +436,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
      */
     protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
         if (firstOperatorWrapper != null) {
-            firstOperatorWrapper.close(actionExecutor);
+            firstOperatorWrapper.close(actionExecutor, isStoppingBySyncSavepoint);
         }
     }
 
@@ -745,6 +747,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
         return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator();
     }
 
+    public void setIsStoppingBySyncSavepoint(boolean stoppingBySyncSavepoint) {
+        this.isStoppingBySyncSavepoint = stoppingBySyncSavepoint;
+    }
+
     /** Wrapper class to access the chained sources and their's outputs. */
     public static class ChainedSource {
         private final WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 2cb6902..b709299 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -172,7 +172,16 @@ public class SourceStreamTask<
                                         new CancelTaskException(sourceThreadThrowable));
                             } else if (!isFinished && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
+                            } else if (sourceThreadThrowable != null
+                                    || isCanceled()
+                                    || isFinished) {
+                                mailboxProcessor.allActionsCompleted();
                             } else {
+                                // this is a "true" end of input regardless of whether
+                                // stop-with-savepoint was issued or not
+                                synchronized (lock) {
+                                    operatorChain.setIsStoppingBySyncSavepoint(false);
+                                }
                                 mailboxProcessor.allActionsCompleted();
                             }
                         });
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
index 8c15174..401bc89 100755
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
@@ -39,8 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This class handles the close, endInput and other related logic of a {@link StreamOperator}. It
  * also automatically propagates the close operation to the next wrapper that the {@link #next}
  * points to, so we can use {@link #next} to link all operator wrappers in the operator chain and
- * close all operators only by calling the {@link #close(StreamTaskActionExecutor)} method of the
- * header operator wrapper.
+ * close all operators only by calling the {@link #close(StreamTaskActionExecutor, boolean)} method
+ * of the header operator wrapper.
  */
 @Internal
 public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
@@ -120,8 +120,9 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
      * MailboxExecutor#yield()} to take the mails of closing operator and running timers and run
      * them.
      */
-    public void close(StreamTaskActionExecutor actionExecutor) throws Exception {
-        if (!isHead) {
+    public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
+            throws Exception {
+        if (!isHead && !isStoppingBySyncSavepoint) {
             // NOTE: This only do for the case where the operator is one-input operator. At present,
             // any non-head operator on the operator chain is one-input operator.
             actionExecutor.runThrowing(() -> endOperatorInput(1));
@@ -131,7 +132,7 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
 
         // propagate the close operation to the next wrapper
         if (next != null) {
-            next.close(actionExecutor);
+            next.close(actionExecutor, isStoppingBySyncSavepoint);
         }
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8f3eefc..7252cad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -235,6 +235,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     private final ExecutorService channelIOExecutor;
 
     private Long syncSavepointId = null;
+    private Long activeSyncSavepointId = null;
 
     private long latestAsyncCheckpointStartDelayNanos;
 
@@ -427,7 +428,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                         new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
     }
 
-    private void resetSynchronousSavepointId() {
+    private void resetSynchronousSavepointId(long id, boolean succeeded) {
+        if (!succeeded && activeSyncSavepointId != null && activeSyncSavepointId == id) {
+            // allow to process further EndOfPartition events
+            activeSyncSavepointId = null;
+            operatorChain.setIsStoppingBySyncSavepoint(false);
+        }
         syncSavepointId = null;
     }
 
@@ -436,6 +442,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                 syncSavepointId == null,
                 "at most one stop-with-savepoint checkpoint at a time is allowed");
         syncSavepointId = checkpointId;
+        activeSyncSavepointId = checkpointId;
+        operatorChain.setIsStoppingBySyncSavepoint(true);
     }
 
     @VisibleForTesting
@@ -988,6 +996,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     @Override
     public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause)
             throws IOException {
+        resetSynchronousSavepointId(checkpointId, false);
         subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
     }
 
@@ -1013,6 +1022,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                             if (advanceToEndOfTime) {
                                 advanceToEndOfEventTime();
                             }
+                        } else if (activeSyncSavepointId != null
+                                && activeSyncSavepointId < checkpointMetaData.getCheckpointId()) {
+                            activeSyncSavepointId = null;
+                            operatorChain.setIsStoppingBySyncSavepoint(false);
                         }
 
                         subtaskCheckpointCoordinator.checkpointState(
@@ -1066,9 +1079,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     @Override
     public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
         return notifyCheckpointOperation(
-                () ->
-                        subtaskCheckpointCoordinator.notifyCheckpointAborted(
-                                checkpointId, operatorChain, this::isRunning),
+                () -> {
+                    resetSynchronousSavepointId(checkpointId, false);
+                    subtaskCheckpointCoordinator.notifyCheckpointAborted(
+                            checkpointId, operatorChain, this::isRunning);
+                },
                 String.format("checkpoint %d aborted", checkpointId));
     }
 
@@ -1097,7 +1112,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         if (isRunning && isSynchronousSavepointId(checkpointId)) {
             finishTask();
             // Reset to "notify" the internal synchronous savepoint mailbox loop.
-            resetSynchronousSavepointId();
+            resetSynchronousSavepointId(checkpointId, true);
         }
     }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
index 2919470..640efe0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
@@ -133,7 +133,7 @@ public class StreamOperatorWrapperTest extends TestLogger {
     @Test
     public void testClose() throws Exception {
         output.clear();
-        operatorWrappers.get(0).close(containingTask.getActionExecutor());
+        operatorWrappers.get(0).close(containingTask.getActionExecutor(), false);
 
         List<Object> expected = new ArrayList<>();
         for (int i = 0; i < operatorWrappers.size(); i++) {
@@ -172,7 +172,7 @@ public class StreamOperatorWrapperTest extends TestLogger {
                         true);
 
         try {
-            operatorWrapper.close(containingTask.getActionExecutor());
+            operatorWrapper.close(containingTask.getActionExecutor(), false);
             fail("should throw an exception");
         } catch (Throwable t) {
             Optional<Throwable> optional =
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 53d91b6..75bdd20 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -115,6 +115,7 @@ import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -151,7 +152,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static java.util.Arrays.asList;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
+import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.instanceOf;
@@ -182,12 +188,97 @@ public class StreamTaskTest extends TestLogger {
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
 
     @Test
+    public void testSyncSavepointCompleted() throws Exception {
+        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
+    }
+
+    @Test
+    public void testSyncSavepointAborted() throws Exception {
+        testSyncSavepointWithEndInput(
+                (task, id) ->
+                        task.abortCheckpointOnBarrier(
+                                id,
+                                new CheckpointException(
+                                        UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE)),
+                true);
+    }
+
+    @Test
+    public void testSyncSavepointAbortedAsync() throws Exception {
+        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointAbortAsync, true);
+    }
+
+    /**
+     * Test for SyncSavepoint and EndInput interactions. Targets following scenarios scenarios:
+     *
+     * <ol>
+     *   <li>Thread1: notify sync savepoint
+     *   <li>Thread2: endInput
+     *   <li>Thread1: confirm/abort/abortAsync
+     *   <li>assert inputEnded: confirmed - no, abort/abortAsync - yes
+     * </ol>
+     */
+    private void testSyncSavepointWithEndInput(
+            BiConsumerWithException<StreamTask<?, ?>, Long, IOException> savepointResult,
+            boolean expectEndInput)
+            throws Exception {
+        StreamTaskMailboxTestHarness<String> harness =
+                new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
+                        .addInput(STRING_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator())
+                        .build();
+
+        final long checkpointId = 1L;
+        CountDownLatch savepointTriggeredLatch = new CountDownLatch(1);
+        CountDownLatch inputEndedLatch = new CountDownLatch(1);
+
+        MailboxExecutor executor =
+                harness.streamTask.getMailboxExecutorFactory().createExecutor(MAX_PRIORITY);
+        executor.execute(
+                () -> {
+                    try {
+                        harness.streamTask.triggerCheckpointOnBarrier(
+                                new CheckpointMetaData(checkpointId, checkpointId),
+                                new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+                                new CheckpointMetricsBuilder());
+                    } catch (IOException e) {
+                        fail(e.getMessage());
+                    }
+                },
+                "triggerCheckpointOnBarrier");
+        new Thread(
+                        () -> {
+                            try {
+                                savepointTriggeredLatch.await();
+                                harness.endInput();
+                                inputEndedLatch.countDown();
+                            } catch (InterruptedException e) {
+                                fail(e.getMessage());
+                            }
+                        })
+                .start();
+        // this mails should be executed from the one above (from triggerCheckpointOnBarrier)
+        executor.execute(savepointTriggeredLatch::countDown, "savepointTriggeredLatch");
+        executor.execute(
+                () -> {
+                    inputEndedLatch.await();
+                    savepointResult.accept(harness.streamTask, checkpointId);
+                },
+                "savepointResult");
+
+        while (harness.streamTask.isMailboxLoopRunning()) {
+            harness.streamTask.runMailboxStep();
+        }
+
+        Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded());
+    }
+
+    @Test
     public void testCleanUpExceptionSuppressing() throws Exception {
         OneInputStreamTaskTestHarness<String, String> testHarness =
                 new OneInputStreamTaskTestHarness<>(
-                        OneInputStreamTask::new,
-                        BasicTypeInfo.STRING_TYPE_INFO,
-                        BasicTypeInfo.STRING_TYPE_INFO);
+                        OneInputStreamTask::new, STRING_TYPE_INFO, STRING_TYPE_INFO);
 
         testHarness.setupOutputForSingletonOperatorChain();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
index 35d9266..b5f523d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
@@ -30,9 +30,15 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
     private static final long serialVersionUID = 1L;
 
     private final String name;
+    private static volatile boolean inputEnded = false;
+
+    public TestBoundedOneInputStreamOperator() {
+        this("test");
+    }
 
     public TestBoundedOneInputStreamOperator(String name) {
         this.name = name;
+        inputEnded = false;
     }
 
     @Override
@@ -42,6 +48,7 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
 
     @Override
     public void endInput() {
+        inputEnded = true;
         output("[" + name + "]: End of input");
     }
 
@@ -59,4 +66,8 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
     private void output(String record) {
         output.collect(new StreamRecord<>(record));
     }
+
+    public static boolean isInputEnded() {
+        return inputEnded;
+    }
 }


[flink] 03/05: [FLINK-21132][runtime][tests] Parameterize StopWithSavepoint test with chaining strategy

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ea6ec4a36599030063b0943e7ef34b4124b8ebb
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Jan 29 16:51:53 2021 +0100

    [FLINK-21132][runtime][tests] Parameterize StopWithSavepoint test with chaining strategy
---
 .../flink/test/checkpointing/SavepointITCase.java  | 58 ++++++++++++----------
 1 file changed, 32 insertions(+), 26 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index c818c8f..9a98838 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -377,8 +377,8 @@ public class SavepointITCase extends TestLogger {
 
         private transient boolean processed;
 
-        BoundedPassThroughOperator() {
-            chainingStrategy = ChainingStrategy.ALWAYS;
+        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+            this.chainingStrategy = chainingStrategy;
         }
 
         @Override
@@ -393,7 +393,6 @@ public class SavepointITCase extends TestLogger {
                 processed = true;
                 progressLatch.countDown();
             }
-            Thread.sleep(1000);
         }
 
         // --------------------------------------------------------------------
@@ -413,39 +412,46 @@ public class SavepointITCase extends TestLogger {
         final int numTaskManagers = 2;
         final int numSlotsPerTaskManager = 2;
 
-        final MiniClusterResourceFactory clusterFactory =
-                new MiniClusterResourceFactory(
-                        numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
+        for (ChainingStrategy chainingStrategy : ChainingStrategy.values()) {
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
 
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
 
-        BoundedPassThroughOperator<Integer> operator = new BoundedPassThroughOperator<>();
-        DataStream<Integer> stream =
-                env.addSource(new InfiniteTestSource())
-                        .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(chainingStrategy);
+            DataStream<Integer> stream =
+                    env.addSource(new InfiniteTestSource())
+                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
 
-        stream.addSink(new DiscardingSink<>());
+            stream.addSink(new DiscardingSink<>());
 
-        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-        final JobID jobId = jobGraph.getJobID();
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
 
-        MiniClusterWithClientResource cluster = clusterFactory.get();
-        cluster.before();
-        ClusterClient<?> client = cluster.getClusterClient();
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
 
-        try {
-            BoundedPassThroughOperator.resetForTest(1);
+            try {
+                BoundedPassThroughOperator.resetForTest(1);
 
-            client.submitJob(jobGraph).get();
+                client.submitJob(jobGraph).get();
 
-            BoundedPassThroughOperator.getProgressLatch().await();
+                BoundedPassThroughOperator.getProgressLatch().await();
 
-            client.stopWithSavepoint(jobId, false, null).get();
+                client.stopWithSavepoint(jobId, false, null).get();
 
-            Assert.assertFalse(BoundedPassThroughOperator.inputEnded);
-        } finally {
-            cluster.after();
+                Assert.assertFalse(
+                        "input ended with chainingStrategy " + chainingStrategy,
+                        BoundedPassThroughOperator.inputEnded);
+            } finally {
+                cluster.after();
+            }
         }
     }
 


[flink] 02/05: [FLINK-21132][runtime][tests] Stop with savepoint shouldn't end input

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1dda3c8f7e6e7adbd0e9f1d04124f53c312b461b
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Sat Jan 23 23:10:22 2021 +0800

    [FLINK-21132][runtime][tests] Stop with savepoint shouldn't end input
---
 .../flink/test/checkpointing/SavepointITCase.java  | 85 ++++++++++++++++++++++
 1 file changed, 85 insertions(+)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 7637b07..c818c8f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -52,6 +53,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
@@ -364,6 +370,85 @@ public class SavepointITCase extends TestLogger {
         }
     }
 
+    static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T>, BoundedOneInput {
+        static volatile CountDownLatch progressLatch;
+        static volatile boolean inputEnded;
+
+        private transient boolean processed;
+
+        BoundedPassThroughOperator() {
+            chainingStrategy = ChainingStrategy.ALWAYS;
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            inputEnded = true;
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+            if (!processed) {
+                processed = true;
+                progressLatch.countDown();
+            }
+            Thread.sleep(1000);
+        }
+
+        // --------------------------------------------------------------------
+
+        static CountDownLatch getProgressLatch() {
+            return progressLatch;
+        }
+
+        static void resetForTest(int parallelism) {
+            progressLatch = new CountDownLatch(parallelism);
+            inputEnded = false;
+        }
+    }
+
+    @Test
+    public void testStopSavepointWithBoundedInput() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        final MiniClusterResourceFactory clusterFactory =
+                new MiniClusterResourceFactory(
+                        numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        BoundedPassThroughOperator<Integer> operator = new BoundedPassThroughOperator<>();
+        DataStream<Integer> stream =
+                env.addSource(new InfiniteTestSource())
+                        .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+
+        stream.addSink(new DiscardingSink<>());
+
+        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        MiniClusterWithClientResource cluster = clusterFactory.get();
+        cluster.before();
+        ClusterClient<?> client = cluster.getClusterClient();
+
+        try {
+            BoundedPassThroughOperator.resetForTest(1);
+
+            client.submitJob(jobGraph).get();
+
+            BoundedPassThroughOperator.getProgressLatch().await();
+
+            client.stopWithSavepoint(jobId, false, null).get();
+
+            Assert.assertFalse(BoundedPassThroughOperator.inputEnded);
+        } finally {
+            cluster.after();
+        }
+    }
+
     @Test
     public void testSubmitWithUnknownSavepointPath() throws Exception {
         // Config


[flink] 05/05: [hotfix][task] Rename SourceStreamTask.isFinished to wasStoppedExternally

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c19ab7dc6fc06e1130b26d94e2f72455db9c85b
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Feb 2 19:34:39 2021 +0100

    [hotfix][task] Rename SourceStreamTask.isFinished to wasStoppedExternally
---
 .../apache/flink/streaming/runtime/tasks/SourceStreamTask.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index b709299..4ebbd03 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -66,7 +66,7 @@ public class SourceStreamTask<
      * Indicates whether this Task was purposefully finished (by finishTask()), in this case we want
      * to ignore exceptions thrown after finishing, to ensure shutdown works smoothly.
      */
-    private volatile boolean isFinished = false;
+    private volatile boolean wasStoppedExternally = false;
 
     public SourceStreamTask(Environment env) throws Exception {
         this(env, new Object());
@@ -170,11 +170,11 @@ public class SourceStreamTask<
                                             .isPresent()) {
                                 mailboxProcessor.reportThrowable(
                                         new CancelTaskException(sourceThreadThrowable));
-                            } else if (!isFinished && sourceThreadThrowable != null) {
+                            } else if (!wasStoppedExternally && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
                             } else if (sourceThreadThrowable != null
                                     || isCanceled()
-                                    || isFinished) {
+                                    || wasStoppedExternally) {
                                 mailboxProcessor.allActionsCompleted();
                             } else {
                                 // this is a "true" end of input regardless of whether
@@ -205,7 +205,7 @@ public class SourceStreamTask<
 
     @Override
     protected void finishTask() throws Exception {
-        isFinished = true;
+        wasStoppedExternally = true;
         cancelTask();
     }
 


[flink] 04/05: [FLINK-21132][runtime][tests] Test StopWith Savepoint against concurrent EndOfInput

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 61a745b4fd0ccd4c7879b0c2e47008cf77cc0123
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Feb 2 19:16:53 2021 +0100

    [FLINK-21132][runtime][tests] Test StopWith Savepoint against concurrent EndOfInput
---
 .../flink/test/checkpointing/SavepointITCase.java  | 147 ++++++++++++++++++++-
 1 file changed, 141 insertions(+), 6 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 9a98838..6215dd3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -36,12 +36,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -86,16 +88,21 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.concurrent.CompletableFuture.allOf;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN;
 import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -373,6 +380,8 @@ public class SavepointITCase extends TestLogger {
     static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
             implements OneInputStreamOperator<T, T>, BoundedOneInput {
         static volatile CountDownLatch progressLatch;
+        static volatile CountDownLatch snapshotAllowedLatch;
+        static volatile CountDownLatch snapshotStartedLatch;
         static volatile boolean inputEnded;
 
         private transient boolean processed;
@@ -381,6 +390,14 @@ public class SavepointITCase extends TestLogger {
             this.chainingStrategy = chainingStrategy;
         }
 
+        private static void allowSnapshots() {
+            snapshotAllowedLatch.countDown();
+        }
+
+        public static void awaitSnapshotStarted() throws InterruptedException {
+            snapshotStartedLatch.await();
+        }
+
         @Override
         public void endInput() throws Exception {
             inputEnded = true;
@@ -395,19 +412,95 @@ public class SavepointITCase extends TestLogger {
             }
         }
 
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws Exception {
+            snapshotStartedLatch.countDown();
+            snapshotAllowedLatch.await();
+            super.snapshotState(context);
+        }
+
         // --------------------------------------------------------------------
 
         static CountDownLatch getProgressLatch() {
             return progressLatch;
         }
 
-        static void resetForTest(int parallelism) {
+        static void resetForTest(int parallelism, boolean allowSnapshots) {
             progressLatch = new CountDownLatch(parallelism);
+            snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism);
+            snapshotStartedLatch = new CountDownLatch(parallelism);
             inputEnded = false;
         }
     }
 
     @Test
+    public void testStopSavepointWithBoundedInputConcurrently() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        while (true) {
+
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
+
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+            env.setParallelism(1);
+
+            // It's only possible to test this with chaining. Without it, JM fails the job before
+            // the downstream gets the abort notification
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+            InfiniteTestSource source = new InfiniteTestSource();
+            DataStream<Integer> stream =
+                    env.addSource(source)
+                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+
+            stream.addSink(new DiscardingSink<>());
+
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
+
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
+
+            try {
+                BoundedPassThroughOperator.resetForTest(1, false);
+                InfiniteTestSource.resetForTest();
+
+                client.submitJob(jobGraph).get();
+
+                BoundedPassThroughOperator.getProgressLatch().await();
+                InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
+                CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
+                BoundedPassThroughOperator.awaitSnapshotStarted();
+                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
+                BoundedPassThroughOperator.allowSnapshots();
+                stop.get();
+                Assert.assertTrue("input NOT ended ", BoundedPassThroughOperator.inputEnded);
+                return;
+            } catch (Exception e) {
+                // if sources and the whole job ends before the checkpoint completes
+                // then coordinator will shut down and savepoint will be aborted - retry
+                if (!ischeckpointcoordinatorshutdownError(e)) {
+                    throw e;
+                }
+            } finally {
+                cluster.after();
+            }
+        }
+    }
+
+    private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable) {
+        return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
+                .filter(e -> e.getCheckpointFailureReason() == CHECKPOINT_COORDINATOR_SHUTDOWN)
+                .isPresent();
+    }
+
+    @Test
     public void testStopSavepointWithBoundedInput() throws Exception {
         final int numTaskManagers = 2;
         final int numSlotsPerTaskManager = 2;
@@ -438,7 +531,7 @@ public class SavepointITCase extends TestLogger {
             ClusterClient<?> client = cluster.getClusterClient();
 
             try {
-                BoundedPassThroughOperator.resetForTest(1);
+                BoundedPassThroughOperator.resetForTest(1, true);
 
                 client.submitJob(jobGraph).get();
 
@@ -662,14 +755,31 @@ public class SavepointITCase extends TestLogger {
 
         private static final long serialVersionUID = 1L;
         private volatile boolean running = true;
+        private volatile boolean suspended = false;
+        private static final Collection<InfiniteTestSource> createdSources =
+                new CopyOnWriteArrayList<>();
+        private transient volatile CompletableFuture<Void> completeFuture;
+
+        public InfiniteTestSource() {
+            createdSources.add(this);
+        }
 
         @Override
         public void run(SourceContext<Integer> ctx) throws Exception {
-            while (running) {
-                synchronized (ctx.getCheckpointLock()) {
-                    ctx.collect(1);
+            completeFuture = new CompletableFuture<>();
+            createdSources.add(this);
+            try {
+                while (running) {
+                    if (!suspended) {
+                        synchronized (ctx.getCheckpointLock()) {
+                            ctx.collect(1);
+                        }
+                    }
+                    Thread.sleep(1);
                 }
-                Thread.sleep(1);
+                completeFuture.complete(null);
+            } catch (Exception e) {
+                completeFuture.completeExceptionally(e);
             }
         }
 
@@ -677,6 +787,31 @@ public class SavepointITCase extends TestLogger {
         public void cancel() {
             running = false;
         }
+
+        public void suspend() {
+            suspended = true;
+        }
+
+        public static void resetForTest() {
+            createdSources.clear();
+        }
+
+        public CompletableFuture<Void> getCompleteFuture() {
+            return completeFuture;
+        }
+
+        public static void cancelAllAndAwait() throws ExecutionException, InterruptedException {
+            createdSources.forEach(InfiniteTestSource::cancel);
+            allOf(
+                            createdSources.stream()
+                                    .map(InfiniteTestSource::getCompleteFuture)
+                                    .toArray(CompletableFuture[]::new))
+                    .get();
+        }
+
+        public static void suspendAll() {
+            createdSources.forEach(InfiniteTestSource::suspend);
+        }
     }
 
     private static class StatefulCounter extends RichMapFunction<Integer, Integer>