You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/11 14:57:49 UTC

[flink] branch release-1.12 updated (20030c2 -> 982dc7f)

This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 20030c2  [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
     new b452632  [hotfix][tests] Import static constant in SourceStreamTaskTest
     new 982dc7f  [FLINK-21312][checkpointing] Unset IsStoppingBySyncSavepoint from the LegacySourceThread

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streaming/api/operators/StreamSource.java      |  1 +
 .../streaming/runtime/tasks/SourceStreamTask.java  | 14 ++---
 .../runtime/tasks/SourceStreamTaskTest.java        | 67 ++++++++++++++++-----
 .../flink/test/checkpointing/SavepointITCase.java  | 68 +---------------------
 4 files changed, 59 insertions(+), 91 deletions(-)


[flink] 02/02: [FLINK-21312][checkpointing] Unset IsStoppingBySyncSavepoint from the LegacySourceThread

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 982dc7f803e1ab56276f1db94e3d2e7987c33993
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Sun Feb 7 21:57:12 2021 +0100

    [FLINK-21312][checkpointing] Unset IsStoppingBySyncSavepoint from the LegacySourceThread
    
    1. Unset IsStoppingBySyncSavepoint from the LegacySourceThread
    
    2. Also unset it in StreamSource so that endInput is consistent
    for head and tail operators
    
    3. Replace ITCase with a lower level test.
    
    Legacy Source Thread needs checkpoint lock after it exits run()
    and before it re-sets OperatorChain.StoppingBySyncSavepoint.
    This creates a race condition because that lock is held by
    the Task thread performing sync-savepoint.
    
    On a lower level there is no such problem because there is
    no checkpoint completion notification.
    
    4. Fix init of SavepointITCase.testStopSavepointWithBoundedInput
    - Only one (main) thread counts down BoundedPassThroughOperator.snapshotAllowedLatch so it is set to 1
    - InfiniteTestSource.createdSources is updated from run()
    - Propagate the exception from InfiniteTestSource.run()
---
 .../streaming/api/operators/StreamSource.java      |  1 +
 .../streaming/runtime/tasks/SourceStreamTask.java  | 14 ++---
 .../runtime/tasks/SourceStreamTaskTest.java        | 36 ++++++++++++
 .../flink/test/checkpointing/SavepointITCase.java  | 68 +---------------------
 4 files changed, 44 insertions(+), 75 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 084fff4..6067dff 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -119,6 +119,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
                 // interface,
                 // so we still need the following call to end the input
                 synchronized (lockingObject) {
+                    operatorChain.setIsStoppingBySyncSavepoint(false);
                     operatorChain.endInput(1);
                 }
             }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 59ba2d4..6c9bf23 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -166,16 +166,7 @@ public class SourceStreamTask<
                                         new CancelTaskException(sourceThreadThrowable));
                             } else if (!wasStoppedExternally && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
-                            } else if (sourceThreadThrowable != null
-                                    || isCanceled()
-                                    || wasStoppedExternally) {
-                                mailboxProcessor.allActionsCompleted();
                             } else {
-                                // this is a "true" end of input regardless of whether
-                                // stop-with-savepoint was issued or not
-                                synchronized (lock) {
-                                    operatorChain.setIsStoppingBySyncSavepoint(false);
-                                }
                                 mailboxProcessor.allActionsCompleted();
                             }
                         });
@@ -248,6 +239,11 @@ public class SourceStreamTask<
         public void run() {
             try {
                 mainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);
+                if (!wasStoppedExternally && !isCanceled()) {
+                    synchronized (lock) {
+                        operatorChain.setIsStoppingBySyncSavepoint(false);
+                    }
+                }
                 completionFuture.complete(null);
             } catch (Throwable t) {
                 // Note, t can be also an InterruptedException
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 9bcbf40..fa95939 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -74,6 +74,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
+import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertArrayEquals;
@@ -87,6 +89,40 @@ import static org.junit.Assert.assertTrue;
  */
 public class SourceStreamTaskTest {
 
+    @Test
+    public void testInputEndedBeforeStopWithSavepointConfirmed() throws Exception {
+        CancelTestSource source =
+                new CancelTestSource(
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), "src");
+        TestBoundedOneInputStreamOperator chainTail = new TestBoundedOneInputStreamOperator("t");
+        StreamTaskMailboxTestHarness<String> harness =
+                new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+                        .setupOperatorChain(
+                                new OperatorID(),
+                                new StreamSource<String, CancelTestSource>(source))
+                        .chain(
+                                new OperatorID(),
+                                chainTail,
+                                STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .finish()
+                        .build();
+        Future<Boolean> triggerFuture =
+                harness.streamTask.triggerCheckpointAsync(
+                        new CheckpointMetaData(1, 1),
+                        new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+                        false);
+        while (!triggerFuture.isDone()) {
+            harness.streamTask.runMailboxStep();
+        }
+        // instead of completing stop with savepoint via `notifyCheckpointCompleted`
+        // we simulate that source has finished first. As a result, we expect that the endInput
+        // should have been issued
+        source.cancel();
+        harness.streamTask.invoke();
+        harness.waitForTaskCompletion();
+        assertTrue(TestBoundedOneInputStreamOperator.isInputEnded());
+    }
+
     /** This test verifies that open() and close() are correctly called by the StreamTask. */
     @Test
     public void testOpenClose() throws Exception {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 6215dd3..a29c62b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -427,73 +427,12 @@ public class SavepointITCase extends TestLogger {
 
         static void resetForTest(int parallelism, boolean allowSnapshots) {
             progressLatch = new CountDownLatch(parallelism);
-            snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism);
+            snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : 1);
             snapshotStartedLatch = new CountDownLatch(parallelism);
             inputEnded = false;
         }
     }
 
-    @Test
-    public void testStopSavepointWithBoundedInputConcurrently() throws Exception {
-        final int numTaskManagers = 2;
-        final int numSlotsPerTaskManager = 2;
-
-        while (true) {
-
-            final MiniClusterResourceFactory clusterFactory =
-                    new MiniClusterResourceFactory(
-                            numTaskManagers,
-                            numSlotsPerTaskManager,
-                            getFileBasedCheckpointsConfig());
-
-            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-            env.setParallelism(1);
-
-            // It's only possible to test this with chaining. Without it, JM fails the job before
-            // the downstream gets the abort notification
-            BoundedPassThroughOperator<Integer> operator =
-                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
-            InfiniteTestSource source = new InfiniteTestSource();
-            DataStream<Integer> stream =
-                    env.addSource(source)
-                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
-
-            stream.addSink(new DiscardingSink<>());
-
-            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-            final JobID jobId = jobGraph.getJobID();
-
-            MiniClusterWithClientResource cluster = clusterFactory.get();
-            cluster.before();
-            ClusterClient<?> client = cluster.getClusterClient();
-
-            try {
-                BoundedPassThroughOperator.resetForTest(1, false);
-                InfiniteTestSource.resetForTest();
-
-                client.submitJob(jobGraph).get();
-
-                BoundedPassThroughOperator.getProgressLatch().await();
-                InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
-                CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
-                BoundedPassThroughOperator.awaitSnapshotStarted();
-                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
-                BoundedPassThroughOperator.allowSnapshots();
-                stop.get();
-                Assert.assertTrue("input NOT ended ", BoundedPassThroughOperator.inputEnded);
-                return;
-            } catch (Exception e) {
-                // if sources and the whole job ends before the checkpoint completes
-                // then coordinator will shut down and savepoint will be aborted - retry
-                if (!ischeckpointcoordinatorshutdownError(e)) {
-                    throw e;
-                }
-            } finally {
-                cluster.after();
-            }
-        }
-    }
-
     private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable) {
         return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
                 .filter(e -> e.getCheckpointFailureReason() == CHECKPOINT_COORDINATOR_SHUTDOWN)
@@ -760,10 +699,6 @@ public class SavepointITCase extends TestLogger {
                 new CopyOnWriteArrayList<>();
         private transient volatile CompletableFuture<Void> completeFuture;
 
-        public InfiniteTestSource() {
-            createdSources.add(this);
-        }
-
         @Override
         public void run(SourceContext<Integer> ctx) throws Exception {
             completeFuture = new CompletableFuture<>();
@@ -780,6 +715,7 @@ public class SavepointITCase extends TestLogger {
                 completeFuture.complete(null);
             } catch (Exception e) {
                 completeFuture.completeExceptionally(e);
+                throw e;
             }
         }
 


[flink] 01/02: [hotfix][tests] Import static constant in SourceStreamTaskTest

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4526326a4030cfd59479b09e7abe541a421b85a
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 10 14:37:17 2021 +0100

    [hotfix][tests] Import static constant in SourceStreamTaskTest
---
 .../runtime/tasks/SourceStreamTaskTest.java        | 31 +++++++++++-----------
 1 file changed, 15 insertions(+), 16 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index d7cb5a7..9bcbf40 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -73,6 +73,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertArrayEquals;
@@ -90,7 +91,7 @@ public class SourceStreamTaskTest {
     @Test
     public void testOpenClose() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         testHarness.setupOutputForSingletonOperatorChain();
 
@@ -113,8 +114,7 @@ public class SourceStreamTaskTest {
     public void testStartDelayMetric() throws Exception {
         long sleepTime = 42;
         StreamTaskMailboxTestHarnessBuilder<String> builder =
-                new StreamTaskMailboxTestHarnessBuilder<>(
-                        SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         final Map<String, Metric> metrics = new ConcurrentHashMap<>();
         final TaskMetricGroup taskMetricGroup =
@@ -124,7 +124,7 @@ public class SourceStreamTaskTest {
                 builder.setupOutputForSingletonOperatorChain(
                                 new StreamSource<>(
                                         new CancelTestSource(
-                                                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+                                                STRING_TYPE_INFO.createSerializer(
                                                         new ExecutionConfig()),
                                                 "Hello")))
                         .setTaskMetricGroup(taskMetricGroup)
@@ -227,7 +227,7 @@ public class SourceStreamTaskTest {
     @Test
     public void testClosingAllOperatorsOnChainProperly() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         testHarness
                 .setupOperatorChain(
@@ -238,7 +238,7 @@ public class SourceStreamTaskTest {
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -263,20 +263,19 @@ public class SourceStreamTaskTest {
     @Test
     public void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         testHarness
                 .setupOperatorChain(
                         new OperatorID(),
                         new StreamSource<>(
                                 new CancelTestSource(
-                                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
-                                                new ExecutionConfig()),
+                                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                         "Hello")))
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -331,7 +330,7 @@ public class SourceStreamTaskTest {
     public void testCancellationWithSourceBlockedOnLock(
             boolean withPendingMail, boolean throwInCancel) throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         CancelLockingSource.reset();
         testHarness
@@ -341,7 +340,7 @@ public class SourceStreamTaskTest {
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -444,7 +443,7 @@ public class SourceStreamTaskTest {
     private void testInterruptionExceptionNotSwallowed(
             InterruptedSource.ExceptionGenerator exceptionGenerator) throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         CancelLockingSource.reset();
         testHarness
@@ -454,7 +453,7 @@ public class SourceStreamTaskTest {
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -498,7 +497,7 @@ public class SourceStreamTaskTest {
     @Test
     public void finishingIgnoresExceptions() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         final CompletableFuture<Void> operatorRunningWaitingFuture = new CompletableFuture<>();
         ExceptionThrowingSource.setIsInRunLoopFuture(operatorRunningWaitingFuture);
@@ -518,7 +517,7 @@ public class SourceStreamTaskTest {
     @Test
     public void testWaitsForSourceThreadOnCancel() throws Exception {
         StreamTaskTestHarness<String> harness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         harness.setupOutputForSingletonOperatorChain();
         harness.getStreamConfig().setStreamOperator(new StreamSource<>(new NonStoppingSource()));