You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/19 12:29:57 UTC

[flink] branch release-1.12 updated (a153976 -> e09c919)

This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a153976  [FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval failures on recovery
     new 7e6fb13  [hotfix][runtime] Log checkpoint processing delay if above threshold
     new e09c919  [FLINK-21329][tests] Increase timeout and delay in local test_local_recovery_and_scheduling.sh

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-end-to-end-tests/run-nightly-tests.sh         | 12 ++++++------
 .../test_local_recovery_and_scheduling.sh           |  8 +++++---
 .../runtime/checkpoint/CheckpointMetaData.java      | 21 +++++++++++++++++++--
 .../org/apache/flink/runtime/taskmanager/Task.java  |  3 ++-
 .../runtime/io/CheckpointBarrierHandler.java        |  5 ++++-
 .../runtime/tasks/SourceOperatorStreamTask.java     |  2 +-
 .../streaming/runtime/tasks/SourceStreamTask.java   |  2 +-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java     | 13 +++++++++++++
 8 files changed, 51 insertions(+), 15 deletions(-)

[flink] 01/02: [hotfix][runtime] Log checkpoint processing delay if above threshold

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7e6fb13d8bded83fa5d3c4b19d5b388cd2418010
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Apr 15 12:58:14 2021 +0200

    [hotfix][runtime] Log checkpoint processing delay if above threshold
---
 .../runtime/checkpoint/CheckpointMetaData.java      | 21 +++++++++++++++++++--
 .../org/apache/flink/runtime/taskmanager/Task.java  |  3 ++-
 .../runtime/io/CheckpointBarrierHandler.java        |  5 ++++-
 .../runtime/tasks/SourceOperatorStreamTask.java     |  2 +-
 .../streaming/runtime/tasks/SourceStreamTask.java   |  2 +-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java     | 13 +++++++++++++
 6 files changed, 40 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
index 25540ba..1d8e15e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
@@ -28,12 +28,20 @@ public class CheckpointMetaData implements Serializable {
     /** The ID of the checkpoint. */
     private final long checkpointId;
 
-    /** The timestamp of the checkpoint. */
+    /** The timestamp of the checkpoint triggering. */
     private final long timestamp;
 
+    /** The timestamp of the checkpoint receiving by this subtask. */
+    private final long receiveTimestamp;
+
     public CheckpointMetaData(long checkpointId, long timestamp) {
+        this(checkpointId, timestamp, System.currentTimeMillis());
+    }
+
+    public CheckpointMetaData(long checkpointId, long timestamp, long receiveTimestamp) {
         this.checkpointId = checkpointId;
         this.timestamp = timestamp;
+        this.receiveTimestamp = receiveTimestamp;
     }
 
     public long getCheckpointId() {
@@ -44,6 +52,10 @@ public class CheckpointMetaData implements Serializable {
         return timestamp;
     }
 
+    public long getReceiveTimestamp() {
+        return receiveTimestamp;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -55,13 +67,16 @@ public class CheckpointMetaData implements Serializable {
 
         CheckpointMetaData that = (CheckpointMetaData) o;
 
-        return (checkpointId == that.checkpointId) && (timestamp == that.timestamp);
+        return (checkpointId == that.checkpointId)
+                && (timestamp == that.timestamp)
+                && (receiveTimestamp == that.receiveTimestamp);
     }
 
     @Override
     public int hashCode() {
         int result = (int) (checkpointId ^ (checkpointId >>> 32));
         result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+        result = 31 * result + (int) (receiveTimestamp ^ (receiveTimestamp >>> 32));
         return result;
     }
 
@@ -70,6 +85,8 @@ public class CheckpointMetaData implements Serializable {
         return "CheckpointMetaData{"
                 + "checkpointId="
                 + checkpointId
+                + ", receiveTimestamp="
+                + receiveTimestamp
                 + ", timestamp="
                 + timestamp
                 + '}';
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 288dd4a..3001018 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1258,7 +1258,8 @@ public class Task
 
         final AbstractInvokable invokable = this.invokable;
         final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointID, checkpointTimestamp);
+                new CheckpointMetaData(
+                        checkpointID, checkpointTimestamp, System.currentTimeMillis());
 
         if (executionState == ExecutionState.RUNNING && invokable != null) {
             try {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 132fd78..72cef441 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -104,7 +104,10 @@ public abstract class CheckpointBarrierHandler implements Closeable {
 
     protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
         CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+                new CheckpointMetaData(
+                        checkpointBarrier.getId(),
+                        checkpointBarrier.getTimestamp(),
+                        System.currentTimeMillis());
 
         CheckpointMetricsBuilder checkpointMetrics =
                 new CheckpointMetricsBuilder()
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index cd7e25a..aea97e8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -144,7 +144,7 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
         final long timestamp = System.currentTimeMillis();
 
         final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp);
+                new CheckpointMetaData(checkpointId, timestamp, timestamp);
 
         super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 8e2c959..711f921 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -107,7 +107,7 @@ public class SourceStreamTask<
                             final long timestamp = System.currentTimeMillis();
 
                             final CheckpointMetaData checkpointMetaData =
-                                    new CheckpointMetaData(checkpointId, timestamp);
+                                    new CheckpointMetaData(checkpointId, timestamp, timestamp);
 
                             try {
                                 SourceStreamTask.super
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 5f3ffbf..bf59515 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -75,6 +75,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
             LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
     private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
 
+    private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 30_000;
+
     private final CachingCheckpointStorageWorkerView checkpointStorage;
     private final String taskName;
     private final ExecutorService asyncOperationsThreadPool;
@@ -260,6 +262,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
             return;
         }
 
+        logCheckpointProcessingDelay(metadata);
+
         // Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint
         // if necessary.
         lastCheckpointId = metadata.getCheckpointId();
@@ -695,4 +699,13 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
             throw ex;
         }
     }
+
+    private static void logCheckpointProcessingDelay(CheckpointMetaData checkpointMetaData) {
+        long delay = System.currentTimeMillis() - checkpointMetaData.getReceiveTimestamp();
+        if (delay >= CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS) {
+            LOG.warn(
+                    "Time from receiving all checkpoint barriers/RPC to executing it exceeded threshold: {}ms",
+                    delay);
+        }
+    }
 }

[flink] 02/02: [FLINK-21329][tests] Increase timeout and delay in local test_local_recovery_and_scheduling.sh

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e09c919103a19b97abd169732e44ef7231fab1be
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Apr 15 13:05:21 2021 +0200

    [FLINK-21329][tests] Increase timeout and delay in local test_local_recovery_and_scheduling.sh
    
    Back-pressure is likely to occur without injection delay at the source
    which may lead to delaying checkpoint triggering at the sources
    which may lead to a timeout.
    
    To prevent this, test timeout is increased from 10m to 15m
    and injection delay of 100ms is added.
---
 flink-end-to-end-tests/run-nightly-tests.sh                  | 12 ++++++------
 .../test-scripts/test_local_recovery_and_scheduling.sh       |  8 +++++---
 2 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 3bb4ec4..4a00628 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -236,12 +236,12 @@ fi
 # Sticky Scheduling
 ################################################################################
 
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false true" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false false" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true false" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false true" "skip_check_exceptions"
-run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true true" "skip_check_exceptions"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false 100" "skip_check_exceptions"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false true 100" "skip_check_exceptions"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false false 100" "skip_check_exceptions"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true false 100" "skip_check_exceptions"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false true 100" "skip_check_exceptions"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true true 100" "skip_check_exceptions"
 
 printf "\n[PASS] All bash e2e-tests passed\n"
 
diff --git a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
index 233e62e..5b192dd 100755
--- a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
+++ b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
@@ -60,13 +60,15 @@ function run_local_recovery_test {
     local backend=$3
     local incremental=$4
     local kill_jvm=$5
+    local delay=$6
 
     echo "Running local recovery test with configuration:
         parallelism: ${parallelism}
         max attempts: ${max_attempts}
         backend: ${backend}
         incremental checkpoints: ${incremental}
-        kill JVM: ${kill_jvm}"
+        kill JVM: ${kill_jvm}
+        delay: ${delay}ms"
 
     TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
     # configure for HA
@@ -97,11 +99,11 @@ function run_local_recovery_test {
     --checkpointDir file://$TEST_DATA_DIR/local_recovery_test/checkpoints \
     --output $TEST_DATA_DIR/out/local_recovery_test/out --killJvmOnFail ${kill_jvm} --checkpointInterval 1000 \
     --maxAttempts ${max_attempts} --parallelism ${parallelism} --stateBackend ${backend} \
-    --incrementalCheckpoints ${incremental}
+    --incrementalCheckpoints ${incremental} --delay ${delay}
 
     check_logs ${parallelism} ${max_attempts}
     cleanup_after_test
 }
 
 ## MAIN
-run_test_with_timeout 600 run_local_recovery_test "$@"
+run_test_with_timeout 900 run_local_recovery_test "$@"