You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/03 19:53:17 UTC

[flink] branch release-1.11 updated (ce03675 -> 299820a)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ce03675  [hotfix][clients] ClientUtils return URLClassLoader
     new ff3b9c3  [FLINK-21132][runtime] Don't end input on stop with savepoint
     new c030d31  [FLINK-21132][runtime][tests] Stop with savepoint shouldn't end input
     new df04d5a  [FLINK-21132][runtime][tests] Parameterize StopWithSavepoint test with chaining strategy
     new a996fc0  [FLINK-21132][runtime][tests] Test StopWith Savepoint against concurrent EndOfInput
     new 299820a  [hotfix][task] Rename SourceStreamTask.isFinished to wasStoppedExternally

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streaming/runtime/tasks/OperatorChain.java     |  10 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |  15 +-
 .../runtime/tasks/StreamOperatorWrapper.java       |  18 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  25 ++-
 .../runtime/tasks/StreamOperatorWrapperTest.java   |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  91 +++++++-
 .../tasks/TestBoundedOneInputStreamOperator.java   |  11 +
 .../flink/test/checkpointing/SavepointITCase.java  | 234 ++++++++++++++++++++-
 8 files changed, 382 insertions(+), 26 deletions(-)


[flink] 05/05: [hotfix][task] Rename SourceStreamTask.isFinished to wasStoppedExternally

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 299820a33c1a44c15c113d4226b79052b04cd1b9
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Feb 2 19:34:39 2021 +0100

    [hotfix][task] Rename SourceStreamTask.isFinished to wasStoppedExternally
---
 .../apache/flink/streaming/runtime/tasks/SourceStreamTask.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 1e6af41..a52404e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -64,7 +64,7 @@ public class SourceStreamTask<
      * Indicates whether this Task was purposefully finished (by finishTask()), in this case we want
      * to ignore exceptions thrown after finishing, to ensure shutdown works smoothly.
      */
-    private volatile boolean isFinished = false;
+    private volatile boolean wasStoppedExternally = false;
 
     public SourceStreamTask(Environment env) throws Exception {
         this(env, new Object());
@@ -163,11 +163,11 @@ public class SourceStreamTask<
                                             .isPresent()) {
                                 mailboxProcessor.reportThrowable(
                                         new CancelTaskException(sourceThreadThrowable));
-                            } else if (!isFinished && sourceThreadThrowable != null) {
+                            } else if (!wasStoppedExternally && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
                             } else if (sourceThreadThrowable != null
                                     || isCanceled()
-                                    || isFinished) {
+                                    || wasStoppedExternally) {
                                 mailboxProcessor.allActionsCompleted();
                             } else {
                                 // this is a "true" end of input regardless of whether
@@ -198,7 +198,7 @@ public class SourceStreamTask<
 
     @Override
     protected void finishTask() throws Exception {
-        isFinished = true;
+        wasStoppedExternally = true;
         cancelTask();
     }
 


[flink] 03/05: [FLINK-21132][runtime][tests] Parameterize StopWithSavepoint test with chaining strategy

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df04d5a63911af40a062ff765db8dc12400701a3
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Jan 29 16:51:53 2021 +0100

    [FLINK-21132][runtime][tests] Parameterize StopWithSavepoint test with chaining strategy
---
 .../flink/test/checkpointing/SavepointITCase.java  | 58 ++++++++++++----------
 1 file changed, 32 insertions(+), 26 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 46f9cd2..352d277 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -374,8 +374,8 @@ public class SavepointITCase extends TestLogger {
 
         private transient boolean processed;
 
-        BoundedPassThroughOperator() {
-            chainingStrategy = ChainingStrategy.ALWAYS;
+        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+            this.chainingStrategy = chainingStrategy;
         }
 
         @Override
@@ -390,7 +390,6 @@ public class SavepointITCase extends TestLogger {
                 processed = true;
                 progressLatch.countDown();
             }
-            Thread.sleep(1000);
         }
 
         // --------------------------------------------------------------------
@@ -410,39 +409,46 @@ public class SavepointITCase extends TestLogger {
         final int numTaskManagers = 2;
         final int numSlotsPerTaskManager = 2;
 
-        final MiniClusterResourceFactory clusterFactory =
-                new MiniClusterResourceFactory(
-                        numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
+        for (ChainingStrategy chainingStrategy : ChainingStrategy.values()) {
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
 
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
 
-        BoundedPassThroughOperator<Integer> operator = new BoundedPassThroughOperator<>();
-        DataStream<Integer> stream =
-                env.addSource(new InfiniteTestSource())
-                        .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(chainingStrategy);
+            DataStream<Integer> stream =
+                    env.addSource(new InfiniteTestSource())
+                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
 
-        stream.addSink(new DiscardingSink<>());
+            stream.addSink(new DiscardingSink<>());
 
-        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-        final JobID jobId = jobGraph.getJobID();
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
 
-        MiniClusterWithClientResource cluster = clusterFactory.get();
-        cluster.before();
-        ClusterClient<?> client = cluster.getClusterClient();
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
 
-        try {
-            BoundedPassThroughOperator.resetForTest(1);
+            try {
+                BoundedPassThroughOperator.resetForTest(1);
 
-            client.submitJob(jobGraph).get();
+                client.submitJob(jobGraph).get();
 
-            BoundedPassThroughOperator.getProgressLatch().await();
+                BoundedPassThroughOperator.getProgressLatch().await();
 
-            client.stopWithSavepoint(jobId, false, null).get();
+                client.stopWithSavepoint(jobId, false, null).get();
 
-            Assert.assertFalse(BoundedPassThroughOperator.inputEnded);
-        } finally {
-            cluster.after();
+                Assert.assertFalse(
+                        "input ended with chainingStrategy " + chainingStrategy,
+                        BoundedPassThroughOperator.inputEnded);
+            } finally {
+                cluster.after();
+            }
         }
     }
 


[flink] 01/05: [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff3b9c3bd66ff95c8b94e280e4c80ed1e8acfc82
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Jan 29 16:46:15 2021 +0100

    [FLINK-21132][runtime] Don't end input on stop with savepoint
    
    EndOfInput was used to handle any stopping of the job. When
    stopping with savepoint the input is not actually ended.
    This causes issues with some sinks (e.g. Iceberg).
    
    With this change, endInput is not call for stop-with-savepoint.
    
    To differentiate stop-with-savepoint from other cases
    only checkpoint (RPC/barriers) are considered and not network EOP.
    That's enough because EOP is only injected after the CP completion
    (i.e. when the downstream is also notified by sync savepoint by CP
    barriers).
---
 .../streaming/runtime/tasks/OperatorChain.java     | 10 ++-
 .../streaming/runtime/tasks/SourceStreamTask.java  |  9 +++
 .../runtime/tasks/StreamOperatorWrapper.java       | 18 +++--
 .../flink/streaming/runtime/tasks/StreamTask.java  | 25 ++++--
 .../runtime/tasks/StreamOperatorWrapperTest.java   |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 91 +++++++++++++++++++++-
 .../tasks/TestBoundedOneInputStreamOperator.java   | 11 +++
 7 files changed, 149 insertions(+), 19 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 88e99cf..9af6320 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -105,6 +105,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
     private final OperatorEventDispatcherImpl operatorEventDispatcher;
 
+    private boolean isStoppingBySyncSavepoint;
+
     /**
      * Current status of the input stream of the operator chain. Watermarks explicitly generated by
      * operators in the chain (i.e. timestamp assigner / watermark extractors), will be blocked and
@@ -289,7 +291,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
      * @param inputId the input ID starts from 1 which indicates the first input.
      */
     public void endHeadOperatorInput(int inputId) throws Exception {
-        if (headOperatorWrapper != null) {
+        if (headOperatorWrapper != null && !isStoppingBySyncSavepoint) {
             headOperatorWrapper.endOperatorInput(inputId);
         }
     }
@@ -315,7 +317,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
      */
     protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
         if (headOperatorWrapper != null) {
-            headOperatorWrapper.close(actionExecutor);
+            headOperatorWrapper.close(actionExecutor, isStoppingBySyncSavepoint);
         }
     }
 
@@ -580,6 +582,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
         return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator();
     }
 
+    public void setIsStoppingBySyncSavepoint(boolean stoppingBySyncSavepoint) {
+        this.isStoppingBySyncSavepoint = stoppingBySyncSavepoint;
+    }
+
     // ------------------------------------------------------------------------
     //  Collectors for output chaining
     // ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 687a650..1e6af41 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -165,7 +165,16 @@ public class SourceStreamTask<
                                         new CancelTaskException(sourceThreadThrowable));
                             } else if (!isFinished && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
+                            } else if (sourceThreadThrowable != null
+                                    || isCanceled()
+                                    || isFinished) {
+                                mailboxProcessor.allActionsCompleted();
                             } else {
+                                // this is a "true" end of input regardless of whether
+                                // stop-with-savepoint was issued or not
+                                synchronized (lock) {
+                                    operatorChain.setIsStoppingBySyncSavepoint(false);
+                                }
                                 mailboxProcessor.allActionsCompleted();
                             }
                         });
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
index 56cd074..9617500 100755
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
@@ -39,8 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This class handles the close, endInput and other related logic of a {@link StreamOperator}. It
  * also automatically propagates the close operation to the next wrapper that the {@link #next}
  * points to, so we can use {@link #next} to link all operator wrappers in the operator chain and
- * close all operators only by calling the {@link #close(StreamTaskActionExecutor)} method of the
- * header operator wrapper.
+ * close all operators only by calling the {@link #close(StreamTaskActionExecutor, boolean,
+ * boolean)} method of the header operator wrapper.
  */
 @Internal
 public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
@@ -76,8 +76,9 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
      * MailboxExecutor#yield()} to take the mails of closing operator and running timers and run
      * them.
      */
-    public void close(StreamTaskActionExecutor actionExecutor) throws Exception {
-        close(actionExecutor, false);
+    public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
+            throws Exception {
+        close(actionExecutor, false, isStoppingBySyncSavepoint);
     }
 
     /**
@@ -120,9 +121,12 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
         this.next = next;
     }
 
-    private void close(StreamTaskActionExecutor actionExecutor, boolean invokingEndInput)
+    private void close(
+            StreamTaskActionExecutor actionExecutor,
+            boolean invokingEndInput,
+            boolean isStoppingBySyncSavepoint)
             throws Exception {
-        if (invokingEndInput) {
+        if (invokingEndInput && !isStoppingBySyncSavepoint) {
             // NOTE: This only do for the case where the operator is one-input operator. At present,
             // any non-head operator on the operator chain is one-input operator.
             actionExecutor.runThrowing(() -> endOperatorInput(1));
@@ -132,7 +136,7 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
 
         // propagate the close operation to the next wrapper
         if (next != null) {
-            next.close(actionExecutor, true);
+            next.close(actionExecutor, true, isStoppingBySyncSavepoint);
         }
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4339024..4d7b3c7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -220,6 +220,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     private final ExecutorService channelIOExecutor;
 
     private Long syncSavepointId = null;
+    private Long activeSyncSavepointId = null;
 
     private long latestAsyncCheckpointStartDelayNanos;
 
@@ -399,7 +400,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         }
     }
 
-    private void resetSynchronousSavepointId() {
+    private void resetSynchronousSavepointId(long id, boolean succeeded) {
+        if (!succeeded && activeSyncSavepointId != null && activeSyncSavepointId == id) {
+            // allow to process further EndOfPartition events
+            activeSyncSavepointId = null;
+            operatorChain.setIsStoppingBySyncSavepoint(false);
+        }
         syncSavepointId = null;
     }
 
@@ -408,6 +414,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                 syncSavepointId == null,
                 "at most one stop-with-savepoint checkpoint at a time is allowed");
         syncSavepointId = checkpointId;
+        activeSyncSavepointId = checkpointId;
+        operatorChain.setIsStoppingBySyncSavepoint(true);
     }
 
     @VisibleForTesting
@@ -962,6 +970,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
 
     @Override
     public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
+        resetSynchronousSavepointId(checkpointId, false);
         subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
     }
 
@@ -987,6 +996,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                             if (advanceToEndOfTime) {
                                 advanceToEndOfEventTime();
                             }
+                        } else if (activeSyncSavepointId != null
+                                && activeSyncSavepointId < checkpointMetaData.getCheckpointId()) {
+                            activeSyncSavepointId = null;
+                            operatorChain.setIsStoppingBySyncSavepoint(false);
                         }
 
                         subtaskCheckpointCoordinator.checkpointState(
@@ -1040,9 +1053,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     @Override
     public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
         return notifyCheckpointOperation(
-                () ->
-                        subtaskCheckpointCoordinator.notifyCheckpointAborted(
-                                checkpointId, operatorChain, this::isRunning),
+                () -> {
+                    resetSynchronousSavepointId(checkpointId, false);
+                    subtaskCheckpointCoordinator.notifyCheckpointAborted(
+                            checkpointId, operatorChain, this::isRunning);
+                },
                 String.format("checkpoint %d aborted", checkpointId));
     }
 
@@ -1071,7 +1086,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         if (isRunning && isSynchronousSavepointId(checkpointId)) {
             finishTask();
             // Reset to "notify" the internal synchronous savepoint mailbox loop.
-            resetSynchronousSavepointId();
+            resetSynchronousSavepointId(checkpointId, true);
         }
     }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
index 6f4615c..f1549dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
@@ -132,7 +132,7 @@ public class StreamOperatorWrapperTest extends TestLogger {
     @Test
     public void testClose() throws Exception {
         output.clear();
-        operatorWrappers.get(0).close(containingTask.getActionExecutor());
+        operatorWrappers.get(0).close(containingTask.getActionExecutor(), false);
 
         List<Object> expected = new ArrayList<>();
         for (int i = 0; i < operatorWrappers.size(); i++) {
@@ -170,7 +170,7 @@ public class StreamOperatorWrapperTest extends TestLogger {
                                 .createExecutor(Integer.MAX_VALUE - 1));
 
         try {
-            operatorWrapper.close(containingTask.getActionExecutor());
+            operatorWrapper.close(containingTask.getActionExecutor(), false);
             fail("should throw an exception");
         } catch (Throwable t) {
             Optional<Throwable> optional =
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 38f5ec3..bfd7d54 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -117,6 +117,7 @@ import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -152,7 +153,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
+import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -185,12 +190,92 @@ public class StreamTaskTest extends TestLogger {
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
 
     @Test
+    public void testSyncSavepointCompleted() throws Exception {
+        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
+    }
+
+    @Test
+    public void testSyncSavepointAborted() throws Exception {
+        testSyncSavepointWithEndInput(
+                (task, id) -> task.abortCheckpointOnBarrier(id, new RuntimeException()), true);
+    }
+
+    @Test
+    public void testSyncSavepointAbortedAsync() throws Exception {
+        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointAbortAsync, true);
+    }
+
+    /**
+     * Test for SyncSavepoint and EndInput interactions. Targets following scenarios scenarios:
+     *
+     * <ol>
+     *   <li>Thread1: notify sync savepoint
+     *   <li>Thread2: endInput
+     *   <li>Thread1: confirm/abort/abortAsync
+     *   <li>assert inputEnded: confirmed - no, abort/abortAsync - yes
+     * </ol>
+     */
+    private void testSyncSavepointWithEndInput(
+            BiConsumerWithException<StreamTask<?, ?>, Long, IOException> savepointResult,
+            boolean expectEndInput)
+            throws Exception {
+        StreamTaskMailboxTestHarness<String> harness =
+                new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
+                        .addInput(STRING_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator())
+                        .build();
+
+        final long checkpointId = 1L;
+        CountDownLatch savepointTriggeredLatch = new CountDownLatch(1);
+        CountDownLatch inputEndedLatch = new CountDownLatch(1);
+
+        MailboxExecutor executor =
+                harness.streamTask.getMailboxExecutorFactory().createExecutor(MAX_PRIORITY);
+        executor.execute(
+                () -> {
+                    try {
+                        harness.streamTask.triggerCheckpointOnBarrier(
+                                new CheckpointMetaData(checkpointId, checkpointId),
+                                new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+                                new CheckpointMetrics());
+                    } catch (IOException e) {
+                        fail(e.getMessage());
+                    }
+                },
+                "triggerCheckpointOnBarrier");
+        new Thread(
+                        () -> {
+                            try {
+                                savepointTriggeredLatch.await();
+                                harness.endInput();
+                                inputEndedLatch.countDown();
+                            } catch (InterruptedException e) {
+                                fail(e.getMessage());
+                            }
+                        })
+                .start();
+        // this mails should be executed from the one above (from triggerCheckpointOnBarrier)
+        executor.execute(savepointTriggeredLatch::countDown, "savepointTriggeredLatch");
+        executor.execute(
+                () -> {
+                    inputEndedLatch.await();
+                    savepointResult.accept(harness.streamTask, checkpointId);
+                },
+                "savepointResult");
+
+        while (harness.streamTask.isMailboxLoopRunning()) {
+            harness.streamTask.runMailboxStep();
+        }
+
+        Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded());
+    }
+
+    @Test
     public void testCleanUpExceptionSuppressing() throws Exception {
         OneInputStreamTaskTestHarness<String, String> testHarness =
                 new OneInputStreamTaskTestHarness<>(
-                        OneInputStreamTask::new,
-                        BasicTypeInfo.STRING_TYPE_INFO,
-                        BasicTypeInfo.STRING_TYPE_INFO);
+                        OneInputStreamTask::new, STRING_TYPE_INFO, STRING_TYPE_INFO);
 
         testHarness.setupOutputForSingletonOperatorChain();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
index 35d9266..b5f523d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
@@ -30,9 +30,15 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
     private static final long serialVersionUID = 1L;
 
     private final String name;
+    private static volatile boolean inputEnded = false;
+
+    public TestBoundedOneInputStreamOperator() {
+        this("test");
+    }
 
     public TestBoundedOneInputStreamOperator(String name) {
         this.name = name;
+        inputEnded = false;
     }
 
     @Override
@@ -42,6 +48,7 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
 
     @Override
     public void endInput() {
+        inputEnded = true;
         output("[" + name + "]: End of input");
     }
 
@@ -59,4 +66,8 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
     private void output(String record) {
         output.collect(new StreamRecord<>(record));
     }
+
+    public static boolean isInputEnded() {
+        return inputEnded;
+    }
 }


[flink] 02/05: [FLINK-21132][runtime][tests] Stop with savepoint shouldn't end input

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c030d316caca44912724b4f42c3cd04fac9929a5
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Sat Jan 23 23:10:22 2021 +0800

    [FLINK-21132][runtime][tests] Stop with savepoint shouldn't end input
---
 .../flink/test/checkpointing/SavepointITCase.java  | 85 ++++++++++++++++++++++
 1 file changed, 85 insertions(+)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 25597c6..46f9cd2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
@@ -54,6 +55,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
 import org.apache.flink.util.Collector;
@@ -361,6 +367,85 @@ public class SavepointITCase extends TestLogger {
         }
     }
 
+    static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T>, BoundedOneInput {
+        static volatile CountDownLatch progressLatch;
+        static volatile boolean inputEnded;
+
+        private transient boolean processed;
+
+        BoundedPassThroughOperator() {
+            chainingStrategy = ChainingStrategy.ALWAYS;
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            inputEnded = true;
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+            if (!processed) {
+                processed = true;
+                progressLatch.countDown();
+            }
+            Thread.sleep(1000);
+        }
+
+        // --------------------------------------------------------------------
+
+        static CountDownLatch getProgressLatch() {
+            return progressLatch;
+        }
+
+        static void resetForTest(int parallelism) {
+            progressLatch = new CountDownLatch(parallelism);
+            inputEnded = false;
+        }
+    }
+
+    @Test
+    public void testStopSavepointWithBoundedInput() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        final MiniClusterResourceFactory clusterFactory =
+                new MiniClusterResourceFactory(
+                        numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        BoundedPassThroughOperator<Integer> operator = new BoundedPassThroughOperator<>();
+        DataStream<Integer> stream =
+                env.addSource(new InfiniteTestSource())
+                        .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+
+        stream.addSink(new DiscardingSink<>());
+
+        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        MiniClusterWithClientResource cluster = clusterFactory.get();
+        cluster.before();
+        ClusterClient<?> client = cluster.getClusterClient();
+
+        try {
+            BoundedPassThroughOperator.resetForTest(1);
+
+            client.submitJob(jobGraph).get();
+
+            BoundedPassThroughOperator.getProgressLatch().await();
+
+            client.stopWithSavepoint(jobId, false, null).get();
+
+            Assert.assertFalse(BoundedPassThroughOperator.inputEnded);
+        } finally {
+            cluster.after();
+        }
+    }
+
     @Test
     public void testSubmitWithUnknownSavepointPath() throws Exception {
         // Config


[flink] 04/05: [FLINK-21132][runtime][tests] Test StopWith Savepoint against concurrent EndOfInput

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a996fc0d20ac16592d7b8ff7485dd26430fc1d8f
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Feb 2 19:16:53 2021 +0100

    [FLINK-21132][runtime][tests] Test StopWith Savepoint against concurrent EndOfInput
---
 .../flink/test/checkpointing/SavepointITCase.java  | 147 ++++++++++++++++++++-
 1 file changed, 141 insertions(+), 6 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 352d277..a0e69d8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -38,12 +38,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -87,16 +89,21 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.concurrent.CompletableFuture.allOf;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -370,6 +377,8 @@ public class SavepointITCase extends TestLogger {
     static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
             implements OneInputStreamOperator<T, T>, BoundedOneInput {
         static volatile CountDownLatch progressLatch;
+        static volatile CountDownLatch snapshotAllowedLatch;
+        static volatile CountDownLatch snapshotStartedLatch;
         static volatile boolean inputEnded;
 
         private transient boolean processed;
@@ -378,6 +387,14 @@ public class SavepointITCase extends TestLogger {
             this.chainingStrategy = chainingStrategy;
         }
 
+        private static void allowSnapshots() {
+            snapshotAllowedLatch.countDown();
+        }
+
+        public static void awaitSnapshotStarted() throws InterruptedException {
+            BoundedPassThroughOperator.snapshotStartedLatch.await();
+        }
+
         @Override
         public void endInput() throws Exception {
             inputEnded = true;
@@ -392,19 +409,95 @@ public class SavepointITCase extends TestLogger {
             }
         }
 
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws Exception {
+            snapshotStartedLatch.countDown();
+            snapshotAllowedLatch.await();
+            super.snapshotState(context);
+        }
+
         // --------------------------------------------------------------------
 
         static CountDownLatch getProgressLatch() {
             return progressLatch;
         }
 
-        static void resetForTest(int parallelism) {
+        static void resetForTest(int parallelism, boolean allowSnapshots) {
             progressLatch = new CountDownLatch(parallelism);
+            snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism);
+            snapshotStartedLatch = new CountDownLatch(parallelism);
             inputEnded = false;
         }
     }
 
     @Test
+    public void testStopSavepointWithBoundedInputConcurrently() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        while (true) {
+
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
+
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+            env.setParallelism(1);
+
+            // It's only possible to test this with chaining. Without it, JM fails the job before
+            // the downstream gets the abort notification
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+            InfiniteTestSource source = new InfiniteTestSource();
+            DataStream<Integer> stream =
+                    env.addSource(source)
+                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+
+            stream.addSink(new DiscardingSink<>());
+
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
+
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
+
+            try {
+                BoundedPassThroughOperator.resetForTest(1, false);
+                InfiniteTestSource.resetForTest();
+
+                client.submitJob(jobGraph).get();
+
+                BoundedPassThroughOperator.getProgressLatch().await();
+                InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
+                CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
+                BoundedPassThroughOperator.awaitSnapshotStarted();
+                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
+                BoundedPassThroughOperator.allowSnapshots();
+                stop.get();
+                Assert.assertTrue("input NOT ended ", BoundedPassThroughOperator.inputEnded);
+                return;
+            } catch (Exception e) {
+                // if sources and the whole job ends before the checkpoint completes
+                // then coordinator will shut down and savepoint will be aborted - retry
+                if (!ischeckpointcoordinatorshutdownError(e)) {
+                    throw e;
+                }
+            } finally {
+                cluster.after();
+            }
+        }
+    }
+
+    private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable) {
+        return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
+                .filter(e -> e.getCheckpointFailureReason() == CHECKPOINT_COORDINATOR_SHUTDOWN)
+                .isPresent();
+    }
+
+    @Test
     public void testStopSavepointWithBoundedInput() throws Exception {
         final int numTaskManagers = 2;
         final int numSlotsPerTaskManager = 2;
@@ -435,7 +528,7 @@ public class SavepointITCase extends TestLogger {
             ClusterClient<?> client = cluster.getClusterClient();
 
             try {
-                BoundedPassThroughOperator.resetForTest(1);
+                BoundedPassThroughOperator.resetForTest(1, true);
 
                 client.submitJob(jobGraph).get();
 
@@ -661,14 +754,31 @@ public class SavepointITCase extends TestLogger {
 
         private static final long serialVersionUID = 1L;
         private volatile boolean running = true;
+        private volatile boolean suspended = false;
+        private static final Collection<InfiniteTestSource> createdSources =
+                new CopyOnWriteArrayList<>();
+        private transient volatile CompletableFuture<Void> completeFuture;
+
+        public InfiniteTestSource() {
+            createdSources.add(this);
+        }
 
         @Override
         public void run(SourceContext<Integer> ctx) throws Exception {
-            while (running) {
-                synchronized (ctx.getCheckpointLock()) {
-                    ctx.collect(1);
+            completeFuture = new CompletableFuture<>();
+            createdSources.add(this);
+            try {
+                while (running) {
+                    if (!suspended) {
+                        synchronized (ctx.getCheckpointLock()) {
+                            ctx.collect(1);
+                        }
+                    }
+                    Thread.sleep(1);
                 }
-                Thread.sleep(1);
+                completeFuture.complete(null);
+            } catch (Exception e) {
+                completeFuture.completeExceptionally(e);
             }
         }
 
@@ -676,6 +786,31 @@ public class SavepointITCase extends TestLogger {
         public void cancel() {
             running = false;
         }
+
+        public void suspend() {
+            suspended = true;
+        }
+
+        public static void resetForTest() {
+            createdSources.clear();
+        }
+
+        public CompletableFuture<Void> getCompleteFuture() {
+            return completeFuture;
+        }
+
+        public static void cancelAllAndAwait() throws ExecutionException, InterruptedException {
+            createdSources.forEach(InfiniteTestSource::cancel);
+            allOf(
+                            createdSources.stream()
+                                    .map(InfiniteTestSource::getCompleteFuture)
+                                    .toArray(CompletableFuture[]::new))
+                    .get();
+        }
+
+        public static void suspendAll() {
+            createdSources.forEach(InfiniteTestSource::suspend);
+        }
     }
 
     private static class StatefulCounter extends RichMapFunction<Integer, Integer>