You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/11 14:57:37 UTC

[flink] branch release-1.11 updated (9984b58 -> 3ea8653)

This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9984b58  [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner
     new d342679  [hotfix][tests] Import static constant in SourceStreamTaskTest
     new 3ea8653  [FLINK-21312][checkpointing] Unset IsStoppingBySyncSavepoint from the LegacySourceThread

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streaming/api/operators/StreamSource.java      |  1 +
 .../streaming/runtime/tasks/SourceStreamTask.java  | 14 ++---
 .../runtime/tasks/SourceStreamTaskTest.java        | 71 +++++++++++++++++-----
 .../flink/test/checkpointing/SavepointITCase.java  | 68 +--------------------
 4 files changed, 63 insertions(+), 91 deletions(-)


[flink] 02/02: [FLINK-21312][checkpointing] Unset IsStoppingBySyncSavepoint from the LegacySourceThread

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3ea8653a09dfaf7b09f1f5cae997cc480b68fc36
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Sun Feb 7 21:57:12 2021 +0100

    [FLINK-21312][checkpointing] Unset IsStoppingBySyncSavepoint from the LegacySourceThread
    
    1. Unset IsStoppingBySyncSavepoint from the LegacySourceThread
    
    2. Also unset it in StreamSource so that endInput is consistent
    for head and tail operators
    
    3. Replace ITCase with a lower level test.
    
    Legacy Source Thread needs checkpoint lock after it exits run()
    and before it re-sets OperatorChain.StoppingBySyncSavepoint.
    This creates a race condition because that lock is held by
    the Task thread performing sync-savepoint.
    
    On a lower level there is no such problem because there is
    no checkpoint completion notification.
    
    4. Fix init of SavepointITCase.testStopSavepointWithBoundedInput
     - Only one (main) thread counts down BoundedPassThroughOperator.snapshotAllowedLatch so it is set to 1
     - InfiniteTestSource.createdSources is updated from run()
     - Propagate the exception from InfiniteTestSource.run()
---
 .../streaming/api/operators/StreamSource.java      |  1 +
 .../streaming/runtime/tasks/SourceStreamTask.java  | 14 ++---
 .../runtime/tasks/SourceStreamTaskTest.java        | 40 +++++++++++++
 .../flink/test/checkpointing/SavepointITCase.java  | 68 +---------------------
 4 files changed, 48 insertions(+), 75 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 585f322..30ade24 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -119,6 +119,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
                 // interface,
                 // so we still need the following call to end the input
                 synchronized (lockingObject) {
+                    operatorChain.setIsStoppingBySyncSavepoint(false);
                     operatorChain.endHeadOperatorInput(1);
                 }
             }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index a52404e..f407e08 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -165,16 +165,7 @@ public class SourceStreamTask<
                                         new CancelTaskException(sourceThreadThrowable));
                             } else if (!wasStoppedExternally && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
-                            } else if (sourceThreadThrowable != null
-                                    || isCanceled()
-                                    || wasStoppedExternally) {
-                                mailboxProcessor.allActionsCompleted();
                             } else {
-                                // this is a "true" end of input regardless of whether
-                                // stop-with-savepoint was issued or not
-                                synchronized (lock) {
-                                    operatorChain.setIsStoppingBySyncSavepoint(false);
-                                }
                                 mailboxProcessor.allActionsCompleted();
                             }
                         });
@@ -247,6 +238,11 @@ public class SourceStreamTask<
         public void run() {
             try {
                 headOperator.run(lock, getStreamStatusMaintainer(), operatorChain);
+                if (!wasStoppedExternally && !isCanceled()) {
+                    synchronized (lock) {
+                        operatorChain.setIsStoppingBySyncSavepoint(false);
+                    }
+                }
                 completionFuture.complete(null);
             } catch (Throwable t) {
                 // Note, t can be also an InterruptedException
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 9bcbf40..6b6677d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -74,6 +74,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
+import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertArrayEquals;
@@ -87,6 +89,44 @@ import static org.junit.Assert.assertTrue;
  */
 public class SourceStreamTaskTest {
 
+    @Test
+    public void testInputEndedBeforeStopWithSavepointConfirmed() throws Exception {
+        CancelTestSource source =
+                new CancelTestSource(
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), "src");
+        TestBoundedOneInputStreamOperator chainTail = new TestBoundedOneInputStreamOperator("t");
+        StreamTaskMailboxTestHarness<String> harness =
+                new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+                        .setupOperatorChain(
+                                new OperatorID(),
+                                new StreamSource<String, CancelTestSource>(source))
+                        .chain(
+                                new OperatorID(),
+                                chainTail,
+                                STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        .finish()
+                        .build();
+        harness.getStreamTask()
+                .getConfiguration()
+                .setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+        Future<Boolean> triggerFuture =
+                harness.streamTask.triggerCheckpointAsync(
+                        new CheckpointMetaData(1, 1),
+                        new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+                        false);
+        while (!triggerFuture.isDone()) {
+            harness.streamTask.runMailboxStep();
+        }
+        // instead of completing stop with savepoint via `notifyCheckpointCompleted`
+        // we simulate that source has finished first. As a result, we expect that the endInput
+        // should have been issued
+        source.cancel();
+        harness.streamTask.invoke();
+        harness.waitForTaskCompletion();
+        assertTrue(TestBoundedOneInputStreamOperator.isInputEnded());
+    }
+
     /** This test verifies that open() and close() are correctly called by the StreamTask. */
     @Test
     public void testOpenClose() throws Exception {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index a0e69d8..bb6169e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -424,73 +424,12 @@ public class SavepointITCase extends TestLogger {
 
         static void resetForTest(int parallelism, boolean allowSnapshots) {
             progressLatch = new CountDownLatch(parallelism);
-            snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism);
+            snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : 1);
             snapshotStartedLatch = new CountDownLatch(parallelism);
             inputEnded = false;
         }
     }
 
-    @Test
-    public void testStopSavepointWithBoundedInputConcurrently() throws Exception {
-        final int numTaskManagers = 2;
-        final int numSlotsPerTaskManager = 2;
-
-        while (true) {
-
-            final MiniClusterResourceFactory clusterFactory =
-                    new MiniClusterResourceFactory(
-                            numTaskManagers,
-                            numSlotsPerTaskManager,
-                            getFileBasedCheckpointsConfig());
-
-            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-            env.setParallelism(1);
-
-            // It's only possible to test this with chaining. Without it, JM fails the job before
-            // the downstream gets the abort notification
-            BoundedPassThroughOperator<Integer> operator =
-                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
-            InfiniteTestSource source = new InfiniteTestSource();
-            DataStream<Integer> stream =
-                    env.addSource(source)
-                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
-
-            stream.addSink(new DiscardingSink<>());
-
-            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-            final JobID jobId = jobGraph.getJobID();
-
-            MiniClusterWithClientResource cluster = clusterFactory.get();
-            cluster.before();
-            ClusterClient<?> client = cluster.getClusterClient();
-
-            try {
-                BoundedPassThroughOperator.resetForTest(1, false);
-                InfiniteTestSource.resetForTest();
-
-                client.submitJob(jobGraph).get();
-
-                BoundedPassThroughOperator.getProgressLatch().await();
-                InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
-                CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
-                BoundedPassThroughOperator.awaitSnapshotStarted();
-                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
-                BoundedPassThroughOperator.allowSnapshots();
-                stop.get();
-                Assert.assertTrue("input NOT ended ", BoundedPassThroughOperator.inputEnded);
-                return;
-            } catch (Exception e) {
-                // if sources and the whole job ends before the checkpoint completes
-                // then coordinator will shut down and savepoint will be aborted - retry
-                if (!ischeckpointcoordinatorshutdownError(e)) {
-                    throw e;
-                }
-            } finally {
-                cluster.after();
-            }
-        }
-    }
-
     private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable) {
         return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
                 .filter(e -> e.getCheckpointFailureReason() == CHECKPOINT_COORDINATOR_SHUTDOWN)
@@ -759,10 +698,6 @@ public class SavepointITCase extends TestLogger {
                 new CopyOnWriteArrayList<>();
         private transient volatile CompletableFuture<Void> completeFuture;
 
-        public InfiniteTestSource() {
-            createdSources.add(this);
-        }
-
         @Override
         public void run(SourceContext<Integer> ctx) throws Exception {
             completeFuture = new CompletableFuture<>();
@@ -779,6 +714,7 @@ public class SavepointITCase extends TestLogger {
                 completeFuture.complete(null);
             } catch (Exception e) {
                 completeFuture.completeExceptionally(e);
+                throw e;
             }
         }
 


[flink] 01/02: [hotfix][tests] Import static constant in SourceStreamTaskTest

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d342679a75e8886afbd9c763ca668be67ccc4385
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 10 14:37:17 2021 +0100

    [hotfix][tests] Import static constant in SourceStreamTaskTest
---
 .../runtime/tasks/SourceStreamTaskTest.java        | 31 +++++++++++-----------
 1 file changed, 15 insertions(+), 16 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index d7cb5a7..9bcbf40 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -73,6 +73,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertArrayEquals;
@@ -90,7 +91,7 @@ public class SourceStreamTaskTest {
     @Test
     public void testOpenClose() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         testHarness.setupOutputForSingletonOperatorChain();
 
@@ -113,8 +114,7 @@ public class SourceStreamTaskTest {
     public void testStartDelayMetric() throws Exception {
         long sleepTime = 42;
         StreamTaskMailboxTestHarnessBuilder<String> builder =
-                new StreamTaskMailboxTestHarnessBuilder<>(
-                        SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         final Map<String, Metric> metrics = new ConcurrentHashMap<>();
         final TaskMetricGroup taskMetricGroup =
@@ -124,7 +124,7 @@ public class SourceStreamTaskTest {
                 builder.setupOutputForSingletonOperatorChain(
                                 new StreamSource<>(
                                         new CancelTestSource(
-                                                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+                                                STRING_TYPE_INFO.createSerializer(
                                                         new ExecutionConfig()),
                                                 "Hello")))
                         .setTaskMetricGroup(taskMetricGroup)
@@ -227,7 +227,7 @@ public class SourceStreamTaskTest {
     @Test
     public void testClosingAllOperatorsOnChainProperly() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         testHarness
                 .setupOperatorChain(
@@ -238,7 +238,7 @@ public class SourceStreamTaskTest {
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -263,20 +263,19 @@ public class SourceStreamTaskTest {
     @Test
     public void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         testHarness
                 .setupOperatorChain(
                         new OperatorID(),
                         new StreamSource<>(
                                 new CancelTestSource(
-                                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
-                                                new ExecutionConfig()),
+                                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                         "Hello")))
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -331,7 +330,7 @@ public class SourceStreamTaskTest {
     public void testCancellationWithSourceBlockedOnLock(
             boolean withPendingMail, boolean throwInCancel) throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         CancelLockingSource.reset();
         testHarness
@@ -341,7 +340,7 @@ public class SourceStreamTaskTest {
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -444,7 +443,7 @@ public class SourceStreamTaskTest {
     private void testInterruptionExceptionNotSwallowed(
             InterruptedSource.ExceptionGenerator exceptionGenerator) throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         CancelLockingSource.reset();
         testHarness
@@ -454,7 +453,7 @@ public class SourceStreamTaskTest {
                 .chain(
                         new OperatorID(),
                         new TestBoundedOneInputStreamOperator("Operator1"),
-                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+                        STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
@@ -498,7 +497,7 @@ public class SourceStreamTaskTest {
     @Test
     public void finishingIgnoresExceptions() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         final CompletableFuture<Void> operatorRunningWaitingFuture = new CompletableFuture<>();
         ExceptionThrowingSource.setIsInRunLoopFuture(operatorRunningWaitingFuture);
@@ -518,7 +517,7 @@ public class SourceStreamTaskTest {
     @Test
     public void testWaitsForSourceThreadOnCancel() throws Exception {
         StreamTaskTestHarness<String> harness =
-                new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+                new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
         harness.setupOutputForSingletonOperatorChain();
         harness.getStreamConfig().setStreamOperator(new StreamSource<>(new NonStoppingSource()));