You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 14:03:07 UTC

[flink] 02/02: [FLINK-24749][coordination] Reuse CheckpointStatsTracker

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f80cfde34749e882f2171bb9ef070feb1624847
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Nov 2 16:58:53 2021 +0100

    [FLINK-24749][coordination] Reuse CheckpointStatsTracker
---
 .../DefaultExecutionGraphBuilder.java              |  13 +--
 .../scheduler/DefaultExecutionGraphFactory.java    |  15 ++-
 .../TestingDefaultExecutionGraphBuilder.java       |   4 +-
 .../adaptive/AdaptiveSchedulerClusterITCase.java   | 104 +++++++++++++++++++++
 4 files changed, 125 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 4a61dd3..d1c6d6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -64,6 +63,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -93,7 +93,8 @@ public class DefaultExecutionGraphBuilder {
             ExecutionStateUpdateListener executionStateUpdateListener,
             long initializationTimestamp,
             VertexAttemptNumberStore vertexAttemptNumberStore,
-            VertexParallelismStore vertexParallelismStore)
+            VertexParallelismStore vertexParallelismStore,
+            Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory)
             throws JobExecutionException, JobException {
 
         checkNotNull(jobGraph, "job graph cannot be null");
@@ -204,12 +205,6 @@ public class DefaultExecutionGraphBuilder {
         if (isCheckpointingEnabled(jobGraph)) {
             JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
 
-            // Maximum number of remembered checkpoints
-            int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
-
-            CheckpointStatsTracker checkpointStatsTracker =
-                    new CheckpointStatsTracker(historySize, metrics);
-
             // load the state backend from the application settings
             final StateBackend applicationConfiguredBackend;
             final SerializedValue<StateBackend> serializedAppConfigured =
@@ -316,7 +311,7 @@ public class DefaultExecutionGraphBuilder {
                     completedCheckpointStore,
                     rootBackend,
                     rootStorage,
-                    checkpointStatsTracker,
+                    checkpointStatsTrackerFactory.get(),
                     checkpointsCleaner);
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index f1721c7..0f49909 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
@@ -38,12 +40,14 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.function.CachingSupplier;
 
 import org.slf4j.Logger;
 
 import java.util.HashSet;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
 
 /** Default {@link ExecutionGraphFactory} implementation. */
 public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
@@ -58,6 +62,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
     private final BlobWriter blobWriter;
     private final ShuffleMaster<?> shuffleMaster;
     private final JobMasterPartitionTracker jobMasterPartitionTracker;
+    private final Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory;
 
     public DefaultExecutionGraphFactory(
             Configuration configuration,
@@ -80,6 +85,13 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
         this.blobWriter = blobWriter;
         this.shuffleMaster = shuffleMaster;
         this.jobMasterPartitionTracker = jobMasterPartitionTracker;
+        this.checkpointStatsTrackerFactory =
+                new CachingSupplier<>(
+                        () ->
+                                new CheckpointStatsTracker(
+                                        configuration.getInteger(
+                                                WebOptions.CHECKPOINTS_HISTORY_SIZE),
+                                        jobManagerJobMetricGroup));
     }
 
     @Override
@@ -124,7 +136,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
                         executionStateUpdateListener,
                         initializationTimestamp,
                         vertexAttemptNumberStore,
-                        vertexParallelismStore);
+                        vertexParallelismStore,
+                        checkpointStatsTrackerFactory);
 
         final CheckpointCoordinator checkpointCoordinator =
                 newExecutionGraph.getCheckpointCoordinator();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index bde0cbb..edf85c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
@@ -185,6 +186,7 @@ public class TestingDefaultExecutionGraphBuilder {
                 System.currentTimeMillis(),
                 new DefaultVertexAttemptNumberStore(),
                 Optional.ofNullable(vertexParallelismStore)
-                        .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)));
+                        .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)),
+                () -> new CheckpointStatsTracker(0, new UnregisteredMetricsGroup()));
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
index 9865395..4d1d7e44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
@@ -25,11 +25,20 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
@@ -44,8 +53,14 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -76,6 +91,8 @@ public class AdaptiveSchedulerClusterITCase extends TestLogger {
         configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
         configuration.set(
                 JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(100L));
+        // required for #testCheckpointStatsPersistedAcrossRescale
+        configuration.set(WebOptions.CHECKPOINTS_HISTORY_SIZE, Integer.MAX_VALUE);
 
         return configuration;
     }
@@ -141,6 +158,93 @@ public class AdaptiveSchedulerClusterITCase extends TestLogger {
         assertTrue(jobResultFuture.join().isSuccess());
     }
 
+    @Test
+    public void testCheckpointStatsPersistedAcrossRescale() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+
+        JobVertex jobVertex = new JobVertex("jobVertex", JOB_VERTEX_ID);
+        jobVertex.setInvokableClass(CheckpointingNoOpInvokable.class);
+        jobVertex.setParallelism(PARALLELISM);
+
+        final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
+        jobGraph.setSnapshotSettings(
+                new JobCheckpointingSettings(
+                        CheckpointCoordinatorConfiguration.builder()
+                                .setCheckpointInterval(100)
+                                .setCheckpointTimeout(1000)
+                                .build(),
+                        null));
+
+        miniCluster.submitJob(jobGraph).join();
+
+        // wait until some checkpoints have been completed
+        CommonTestUtils.waitUntilCondition(
+                () ->
+                        miniCluster
+                                .getExecutionGraph(jobGraph.getJobID())
+                                .thenApply(
+                                        eg ->
+                                                eg.getCheckpointStatsSnapshot()
+                                                                .getCounts()
+                                                                .getNumberOfCompletedCheckpoints()
+                                                        > 0)
+                                .get(),
+                Deadline.fromNow(Duration.ofHours(1)));
+
+        miniCluster.terminateTaskManager(0);
+
+        waitUntilParallelismForVertexReached(
+                jobGraph.getJobID(),
+                JOB_VERTEX_ID,
+                NUMBER_SLOTS_PER_TASK_MANAGER * (NUMBER_TASK_MANAGERS - 1));
+
+        // check that the very first checkpoint is still accessible
+        final List<AbstractCheckpointStats> checkpointHistory =
+                miniCluster
+                        .getExecutionGraph(jobGraph.getJobID())
+                        .thenApply(
+                                eg -> eg.getCheckpointStatsSnapshot().getHistory().getCheckpoints())
+                        .get();
+        assertThat(checkpointHistory.get(checkpointHistory.size() - 1).getCheckpointId(), is(1L));
+    }
+
+    /** An invokable that doesn't do anything interesting, but does support checkpointing. */
+    public static class CheckpointingNoOpInvokable extends AbstractInvokable {
+
+        private static final long CANCEL_SIGNAL = -2L;
+        private final BlockingQueue<Long> checkpointsToConfirm = new ArrayBlockingQueue<>(1);
+
+        public CheckpointingNoOpInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            long signal = checkpointsToConfirm.take();
+            while (signal != CANCEL_SIGNAL) {
+                getEnvironment().acknowledgeCheckpoint(signal, new CheckpointMetrics());
+                signal = checkpointsToConfirm.take();
+            }
+        }
+
+        @Override
+        public void cancel() throws Exception {
+            checkpointsToConfirm.add(CANCEL_SIGNAL);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> triggerCheckpointAsync(
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
+            checkpointsToConfirm.add(checkpointMetaData.getCheckpointId());
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
     private JobGraph createBlockingJobGraph(int parallelism) throws IOException {
         final JobVertex blockingOperator = new JobVertex("Blocking operator", JOB_VERTEX_ID);