You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/03 19:53:02 UTC

[flink] 01/05: [FLINK-21132][runtime] Don't end input on stop with savepoint

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3ffbbdd2e3c92bfb167f2c610bded574aba9532
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Jan 29 16:46:15 2021 +0100

    [FLINK-21132][runtime] Don't end input on stop with savepoint
    
    EndOfInput was used to handle any stopping of the job. When
    stopping with savepoint the input is not actually ended.
    This causes issues with some sinks (e.g. Iceberg).
    
    With this change, endInput is not call for stop-with-savepoint.
    
    To differentiate stop-with-savepoint from other cases
    only checkpoint (RPC/barriers) are considered and not network EOP.
    That's enough because EOP is only injected after the CP completion
    (i.e. when the downstream is also notified by sync savepoint by CP
    barriers).
---
 .../streaming/runtime/tasks/OperatorChain.java     | 10 ++-
 .../streaming/runtime/tasks/SourceStreamTask.java  |  9 +++
 .../runtime/tasks/StreamOperatorWrapper.java       | 11 +--
 .../flink/streaming/runtime/tasks/StreamTask.java  | 25 ++++--
 .../runtime/tasks/StreamOperatorWrapperTest.java   |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 91 +++++++++++++++++++++-
 .../tasks/TestBoundedOneInputStreamOperator.java   | 11 +++
 7 files changed, 144 insertions(+), 17 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 68514e8..8c59f75 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -120,6 +120,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
 
     private final OperatorEventDispatcherImpl operatorEventDispatcher;
 
+    private boolean isStoppingBySyncSavepoint;
+
     /**
      * Current status of the input stream of the operator chain. Watermarks explicitly generated by
      * operators in the chain (i.e. timestamp assigner / watermark extractors), will be blocked and
@@ -408,7 +410,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
      */
     @Override
     public void endInput(int inputId) throws Exception {
-        if (mainOperatorWrapper != null) {
+        if (mainOperatorWrapper != null && !isStoppingBySyncSavepoint) {
             mainOperatorWrapper.endOperatorInput(inputId);
         }
     }
@@ -434,7 +436,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
      */
     protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
         if (firstOperatorWrapper != null) {
-            firstOperatorWrapper.close(actionExecutor);
+            firstOperatorWrapper.close(actionExecutor, isStoppingBySyncSavepoint);
         }
     }
 
@@ -745,6 +747,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
         return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator();
     }
 
+    public void setIsStoppingBySyncSavepoint(boolean stoppingBySyncSavepoint) {
+        this.isStoppingBySyncSavepoint = stoppingBySyncSavepoint;
+    }
+
     /** Wrapper class to access the chained sources and their's outputs. */
     public static class ChainedSource {
         private final WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index e3a1f28..1017d5f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -166,7 +166,16 @@ public class SourceStreamTask<
                                         new CancelTaskException(sourceThreadThrowable));
                             } else if (!isFinished && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
+                            } else if (sourceThreadThrowable != null
+                                    || isCanceled()
+                                    || isFinished) {
+                                mailboxProcessor.allActionsCompleted();
                             } else {
+                                // this is a "true" end of input regardless of whether
+                                // stop-with-savepoint was issued or not
+                                synchronized (lock) {
+                                    operatorChain.setIsStoppingBySyncSavepoint(false);
+                                }
                                 mailboxProcessor.allActionsCompleted();
                             }
                         });
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
index 8c15174..401bc89 100755
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
@@ -39,8 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This class handles the close, endInput and other related logic of a {@link StreamOperator}. It
  * also automatically propagates the close operation to the next wrapper that the {@link #next}
  * points to, so we can use {@link #next} to link all operator wrappers in the operator chain and
- * close all operators only by calling the {@link #close(StreamTaskActionExecutor)} method of the
- * header operator wrapper.
+ * close all operators only by calling the {@link #close(StreamTaskActionExecutor, boolean)} method
+ * of the header operator wrapper.
  */
 @Internal
 public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
@@ -120,8 +120,9 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
      * MailboxExecutor#yield()} to take the mails of closing operator and running timers and run
      * them.
      */
-    public void close(StreamTaskActionExecutor actionExecutor) throws Exception {
-        if (!isHead) {
+    public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
+            throws Exception {
+        if (!isHead && !isStoppingBySyncSavepoint) {
             // NOTE: This only do for the case where the operator is one-input operator. At present,
             // any non-head operator on the operator chain is one-input operator.
             actionExecutor.runThrowing(() -> endOperatorInput(1));
@@ -131,7 +132,7 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
 
         // propagate the close operation to the next wrapper
         if (next != null) {
-            next.close(actionExecutor);
+            next.close(actionExecutor, isStoppingBySyncSavepoint);
         }
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4ec9515..18f636d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -226,6 +226,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     private final ExecutorService channelIOExecutor;
 
     private Long syncSavepointId = null;
+    private Long activeSyncSavepointId = null;
 
     private long latestAsyncCheckpointStartDelayNanos;
 
@@ -421,7 +422,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         }
     }
 
-    private void resetSynchronousSavepointId() {
+    private void resetSynchronousSavepointId(long id, boolean succeeded) {
+        if (!succeeded && activeSyncSavepointId != null && activeSyncSavepointId == id) {
+            // allow to process further EndOfPartition events
+            activeSyncSavepointId = null;
+            operatorChain.setIsStoppingBySyncSavepoint(false);
+        }
         syncSavepointId = null;
     }
 
@@ -430,6 +436,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                 syncSavepointId == null,
                 "at most one stop-with-savepoint checkpoint at a time is allowed");
         syncSavepointId = checkpointId;
+        activeSyncSavepointId = checkpointId;
+        operatorChain.setIsStoppingBySyncSavepoint(true);
     }
 
     @VisibleForTesting
@@ -973,6 +981,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
 
     @Override
     public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
+        resetSynchronousSavepointId(checkpointId, false);
         subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
     }
 
@@ -998,6 +1007,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                             if (advanceToEndOfTime) {
                                 advanceToEndOfEventTime();
                             }
+                        } else if (activeSyncSavepointId != null
+                                && activeSyncSavepointId < checkpointMetaData.getCheckpointId()) {
+                            activeSyncSavepointId = null;
+                            operatorChain.setIsStoppingBySyncSavepoint(false);
                         }
 
                         subtaskCheckpointCoordinator.checkpointState(
@@ -1051,9 +1064,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     @Override
     public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
         return notifyCheckpointOperation(
-                () ->
-                        subtaskCheckpointCoordinator.notifyCheckpointAborted(
-                                checkpointId, operatorChain, this::isRunning),
+                () -> {
+                    resetSynchronousSavepointId(checkpointId, false);
+                    subtaskCheckpointCoordinator.notifyCheckpointAborted(
+                            checkpointId, operatorChain, this::isRunning);
+                },
                 String.format("checkpoint %d aborted", checkpointId));
     }
 
@@ -1082,7 +1097,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         if (isRunning && isSynchronousSavepointId(checkpointId)) {
             finishTask();
             // Reset to "notify" the internal synchronous savepoint mailbox loop.
-            resetSynchronousSavepointId();
+            resetSynchronousSavepointId(checkpointId, true);
         }
     }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
index 2919470..640efe0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
@@ -133,7 +133,7 @@ public class StreamOperatorWrapperTest extends TestLogger {
     @Test
     public void testClose() throws Exception {
         output.clear();
-        operatorWrappers.get(0).close(containingTask.getActionExecutor());
+        operatorWrappers.get(0).close(containingTask.getActionExecutor(), false);
 
         List<Object> expected = new ArrayList<>();
         for (int i = 0; i < operatorWrappers.size(); i++) {
@@ -172,7 +172,7 @@ public class StreamOperatorWrapperTest extends TestLogger {
                         true);
 
         try {
-            operatorWrapper.close(containingTask.getActionExecutor());
+            operatorWrapper.close(containingTask.getActionExecutor(), false);
             fail("should throw an exception");
         } catch (Throwable t) {
             Optional<Throwable> optional =
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6706dd8..c1a1883 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -113,6 +113,7 @@ import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -148,7 +149,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static java.util.Arrays.asList;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
+import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.instanceOf;
@@ -179,12 +184,92 @@ public class StreamTaskTest extends TestLogger {
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
 
     @Test
+    public void testSyncSavepointCompleted() throws Exception {
+        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
+    }
+
+    @Test
+    public void testSyncSavepointAborted() throws Exception {
+        testSyncSavepointWithEndInput(
+                (task, id) -> task.abortCheckpointOnBarrier(id, new RuntimeException()), true);
+    }
+
+    @Test
+    public void testSyncSavepointAbortedAsync() throws Exception {
+        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointAbortAsync, true);
+    }
+
+    /**
+     * Test for SyncSavepoint and EndInput interactions. Targets following scenarios scenarios:
+     *
+     * <ol>
+     *   <li>Thread1: notify sync savepoint
+     *   <li>Thread2: endInput
+     *   <li>Thread1: confirm/abort/abortAsync
+     *   <li>assert inputEnded: confirmed - no, abort/abortAsync - yes
+     * </ol>
+     */
+    private void testSyncSavepointWithEndInput(
+            BiConsumerWithException<StreamTask<?, ?>, Long, IOException> savepointResult,
+            boolean expectEndInput)
+            throws Exception {
+        StreamTaskMailboxTestHarness<String> harness =
+                new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
+                        .addInput(STRING_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator())
+                        .build();
+
+        final long checkpointId = 1L;
+        CountDownLatch savepointTriggeredLatch = new CountDownLatch(1);
+        CountDownLatch inputEndedLatch = new CountDownLatch(1);
+
+        MailboxExecutor executor =
+                harness.streamTask.getMailboxExecutorFactory().createExecutor(MAX_PRIORITY);
+        executor.execute(
+                () -> {
+                    try {
+                        harness.streamTask.triggerCheckpointOnBarrier(
+                                new CheckpointMetaData(checkpointId, checkpointId),
+                                new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+                                new CheckpointMetricsBuilder());
+                    } catch (IOException e) {
+                        fail(e.getMessage());
+                    }
+                },
+                "triggerCheckpointOnBarrier");
+        new Thread(
+                        () -> {
+                            try {
+                                savepointTriggeredLatch.await();
+                                harness.endInput();
+                                inputEndedLatch.countDown();
+                            } catch (InterruptedException e) {
+                                fail(e.getMessage());
+                            }
+                        })
+                .start();
+        // this mails should be executed from the one above (from triggerCheckpointOnBarrier)
+        executor.execute(savepointTriggeredLatch::countDown, "savepointTriggeredLatch");
+        executor.execute(
+                () -> {
+                    inputEndedLatch.await();
+                    savepointResult.accept(harness.streamTask, checkpointId);
+                },
+                "savepointResult");
+
+        while (harness.streamTask.isMailboxLoopRunning()) {
+            harness.streamTask.runMailboxStep();
+        }
+
+        Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded());
+    }
+
+    @Test
     public void testCleanUpExceptionSuppressing() throws Exception {
         OneInputStreamTaskTestHarness<String, String> testHarness =
                 new OneInputStreamTaskTestHarness<>(
-                        OneInputStreamTask::new,
-                        BasicTypeInfo.STRING_TYPE_INFO,
-                        BasicTypeInfo.STRING_TYPE_INFO);
+                        OneInputStreamTask::new, STRING_TYPE_INFO, STRING_TYPE_INFO);
 
         testHarness.setupOutputForSingletonOperatorChain();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
index 35d9266..b5f523d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java
@@ -30,9 +30,15 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
     private static final long serialVersionUID = 1L;
 
     private final String name;
+    private static volatile boolean inputEnded = false;
+
+    public TestBoundedOneInputStreamOperator() {
+        this("test");
+    }
 
     public TestBoundedOneInputStreamOperator(String name) {
         this.name = name;
+        inputEnded = false;
     }
 
     @Override
@@ -42,6 +48,7 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
 
     @Override
     public void endInput() {
+        inputEnded = true;
         output("[" + name + "]: End of input");
     }
 
@@ -59,4 +66,8 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
     private void output(String record) {
         output.collect(new StreamRecord<>(record));
     }
+
+    public static boolean isInputEnded() {
+        return inputEnded;
+    }
 }