You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2021/11/10 15:56:55 UTC

[flink-ml] branch master updated (a0331de -> acbf4b9)

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git.


    from a0331de  [FLINK-24729][iteration] Support iteration with mixed operator life-cycle
     add a2294e6  [FLINK-24807][iteration] Support snapshot the ReplayOperator
     add 7a05cf8  [FLINK-24807][iteration] Stores the state for per-round wrapper
     add 68c5945  [hotfix][iteration] Rename the all-round checkpoint test to be it case
     add 0b033df  [FLINK-24807][iteration] Add per-round checkpoint IT case
     add f06a818  [FLINK-24807][iteration] not emit CoordinatorCheckpointEvent after terminating
     add 8d3f2ad  [FLINK-24807][iteration] Support raw operator state
     new acbf4b9  [FLINK-24807][iteration] Not start logging at the head operator if the barrier feed back first

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/iteration/checkpoint/Checkpoints.java    |  22 ++
 .../datacache/nonkeyed/DataCacheReader.java        |  35 ++-
 .../datacache/nonkeyed/DataCacheSnapshot.java      |   2 +-
 .../datacache/nonkeyed/DataCacheWriter.java        |  12 +-
 .../flink/iteration/operator/HeadOperator.java     |  25 +-
 .../operator/HeadOperatorCheckpointAligner.java    |  16 +-
 .../flink/iteration/operator/ReplayOperator.java   | 226 +++++++++++-----
 .../coordinator/HeadOperatorCoordinator.java       |   3 +
 .../coordinator/SharedProgressAligner.java         |  20 +-
 .../event/TerminatingOnInitializeEvent.java        |  25 +-
 .../headprocessor/HeadOperatorRecordProcessor.java |   2 +
 .../operator/headprocessor/HeadOperatorState.java  |   4 +
 .../TerminatingHeadOperatorRecordProcessor.java    |  13 +-
 .../perround/AbstractPerRoundWrapperOperator.java  | 184 ++++++++++++-
 .../proxy/state/ProxyStateSnapshotContext.java     |   3 +-
 .../state/ProxyStreamOperatorStateContext.java     |  47 +++-
 .../flink/iteration/operator/HeadOperatorTest.java |  35 ++-
 .../iteration/operator/ReplayOperatorTest.java     | 295 ++++++++++++++++++---
 .../coordinator/SharedProgressAlignerTest.java     |  66 +++--
 .../OneInputPerRoundWrapperOperatorTest.java       | 172 ++++++++++++
 ...t.java => BoundedAllRoundCheckpointITCase.java} |   4 +-
 ...t.java => BoundedPerRoundCheckpointITCase.java} | 113 ++++----
 .../BoundedPerRoundStreamIterationITCase.java      |   2 +-
 .../flink/test/iteration/operators/FailingMap.java |   4 +-
 .../operators/TwoInputReducePerRoundOperator.java  |  35 ++-
 25 files changed, 1127 insertions(+), 238 deletions(-)
 copy flink-ml-api/src/main/java/org/apache/flink/ml/param/ParamValidator.java => flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/event/TerminatingOnInitializeEvent.java (58%)
 copy flink-ml-tests/src/test/java/org/apache/flink/test/iteration/{BoundedAllRoundCheckpointTest.java => BoundedAllRoundCheckpointITCase.java} (99%)
 rename flink-ml-tests/src/test/java/org/apache/flink/test/iteration/{BoundedAllRoundCheckpointTest.java => BoundedPerRoundCheckpointITCase.java} (58%)

[flink-ml] 01/01: [FLINK-24807][iteration] Not start logging at the head operator if the barrier feed back first

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git

commit acbf4b953c22c5294894a0b753c6d3a1211cf643
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Wed Nov 10 22:27:49 2021 +0800

    [FLINK-24807][iteration] Not start logging at the head operator if the barrier feed back first
    
    This closes #25.
---
 .../flink/iteration/checkpoint/Checkpoints.java    | 22 +++++++++++++++
 .../flink/iteration/operator/HeadOperatorTest.java | 32 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java
index edbfeba..9a16432 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java
@@ -48,11 +48,25 @@ public class Checkpoints<T> implements AutoCloseable {
     private final FileSystem fileSystem;
     private final SupplierWithException<Path, IOException> pathSupplier;
 
+    /**
+     * Stores the pending checkpoints and whether they are canceled. This field would be shared
+     * between the head and tail operators for aborting the checkpoints.
+     */
     private final ConcurrentHashMap<Long, Tuple2<PendingCheckpoint, Boolean>>
             uncompletedCheckpoints = new ConcurrentHashMap<>();
 
+    /**
+     * Stores the list of pending checkpoints ordered by the checkpoint id. This field is only
+     * accessed by the head operator.
+     */
     private final TreeMap<Long, PendingCheckpoint> sortedUncompletedCheckpoints = new TreeMap<>();
 
+    /**
+     * Stores the checkpoint id of the latest completed one. It is to avoid the feedback barrier get
+     * processed before the head operator actually snapshots the state.
+     */
+    private long latestCompletedCheckpointId;
+
     public Checkpoints(
             TypeSerializer<T> typeSerializer,
             FileSystem fileSystem,
@@ -77,6 +91,10 @@ public class Checkpoints<T> implements AutoCloseable {
 
     public void startLogging(long checkpointId, OperatorStateCheckpointOutputStream outputStream)
             throws IOException {
+        if (checkpointId <= latestCompletedCheckpointId) {
+            return;
+        }
+
         Tuple2<PendingCheckpoint, Boolean> possibleCheckpoint =
                 uncompletedCheckpoints.computeIfAbsent(
                         checkpointId,
@@ -123,6 +141,10 @@ public class Checkpoints<T> implements AutoCloseable {
     }
 
     public void commitCheckpointsUntil(long checkpointId) {
+        if (latestCompletedCheckpointId < checkpointId) {
+            latestCompletedCheckpointId = checkpointId;
+        }
+
         SortedMap<Long, PendingCheckpoint> completedCheckpoints =
                 sortedUncompletedCheckpoints.headMap(checkpointId, true);
         completedCheckpoints
diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
index 3a407a0..c1fe602 100644
--- a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
+++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
@@ -767,6 +767,38 @@ public class HeadOperatorTest extends TestLogger {
                 });
     }
 
+    @Test(timeout = 20000)
+    public void testCheckpointsWithBarrierFeedbackFirst() throws Exception {
+        IterationID iterationId = new IterationID();
+        OperatorID operatorId = new OperatorID();
+
+        createHarnessAndRun(
+                iterationId,
+                operatorId,
+                null,
+                harness -> {
+                    harness.getTaskStateManager().getWaitForReportLatch().reset();
+                    harness.processElement(new StreamRecord<>(IterationRecord.newRecord(100, 0)));
+                    harness.processAll();
+
+                    harness.getStreamTask()
+                            .triggerCheckpointAsync(
+                                    new CheckpointMetaData(2, 1000),
+                                    CheckpointOptions.alignedNoTimeout(
+                                            CheckpointType.CHECKPOINT,
+                                            CheckpointStorageLocationReference.getDefault()));
+
+                    // Simulates that the barrier get feed back before the
+                    // CoordinatorCheckpointEvent is dispatched. If we not handle this case,
+                    // there would be deadlock.
+                    putFeedbackRecords(iterationId, IterationRecord.newBarrier(2), null);
+                    dispatchOperatorEvent(harness, operatorId, new CoordinatorCheckpointEvent(2));
+                    harness.processAll();
+                    harness.getTaskStateManager().getWaitForReportLatch().await();
+                    return null;
+                });
+    }
+
     private <T> T createHarnessAndRun(
             IterationID iterationId,
             OperatorID operatorId,