You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 14:03:07 UTC
[flink] 02/02: [FLINK-24749][coordination] Reuse
CheckpointStatsTracker
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2f80cfde34749e882f2171bb9ef070feb1624847
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Nov 2 16:58:53 2021 +0100
[FLINK-24749][coordination] Reuse CheckpointStatsTracker
---
.../DefaultExecutionGraphBuilder.java | 13 +--
.../scheduler/DefaultExecutionGraphFactory.java | 15 ++-
.../TestingDefaultExecutionGraphBuilder.java | 4 +-
.../adaptive/AdaptiveSchedulerClusterITCase.java | 104 +++++++++++++++++++++
4 files changed, 125 insertions(+), 11 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 4a61dd3..d1c6d6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.WebOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
@@ -64,6 +63,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -93,7 +93,8 @@ public class DefaultExecutionGraphBuilder {
ExecutionStateUpdateListener executionStateUpdateListener,
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore,
- VertexParallelismStore vertexParallelismStore)
+ VertexParallelismStore vertexParallelismStore,
+ Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory)
throws JobExecutionException, JobException {
checkNotNull(jobGraph, "job graph cannot be null");
@@ -204,12 +205,6 @@ public class DefaultExecutionGraphBuilder {
if (isCheckpointingEnabled(jobGraph)) {
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
- // Maximum number of remembered checkpoints
- int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
-
- CheckpointStatsTracker checkpointStatsTracker =
- new CheckpointStatsTracker(historySize, metrics);
-
// load the state backend from the application settings
final StateBackend applicationConfiguredBackend;
final SerializedValue<StateBackend> serializedAppConfigured =
@@ -316,7 +311,7 @@ public class DefaultExecutionGraphBuilder {
completedCheckpointStore,
rootBackend,
rootStorage,
- checkpointStatsTracker,
+ checkpointStatsTrackerFactory.get(),
checkpointsCleaner);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index f1721c7..0f49909 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
@@ -38,12 +40,14 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.function.CachingSupplier;
import org.slf4j.Logger;
import java.util.HashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
/** Default {@link ExecutionGraphFactory} implementation. */
public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
@@ -58,6 +62,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
private final BlobWriter blobWriter;
private final ShuffleMaster<?> shuffleMaster;
private final JobMasterPartitionTracker jobMasterPartitionTracker;
+ private final Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory;
public DefaultExecutionGraphFactory(
Configuration configuration,
@@ -80,6 +85,13 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
this.blobWriter = blobWriter;
this.shuffleMaster = shuffleMaster;
this.jobMasterPartitionTracker = jobMasterPartitionTracker;
+ this.checkpointStatsTrackerFactory =
+ new CachingSupplier<>(
+ () ->
+ new CheckpointStatsTracker(
+ configuration.getInteger(
+ WebOptions.CHECKPOINTS_HISTORY_SIZE),
+ jobManagerJobMetricGroup));
}
@Override
@@ -124,7 +136,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
executionStateUpdateListener,
initializationTimestamp,
vertexAttemptNumberStore,
- vertexParallelismStore);
+ vertexParallelismStore,
+ checkpointStatsTrackerFactory);
final CheckpointCoordinator checkpointCoordinator =
newExecutionGraph.getCheckpointCoordinator();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index bde0cbb..edf85c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
@@ -185,6 +186,7 @@ public class TestingDefaultExecutionGraphBuilder {
System.currentTimeMillis(),
new DefaultVertexAttemptNumberStore(),
Optional.ofNullable(vertexParallelismStore)
- .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)));
+ .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)),
+ () -> new CheckpointStatsTracker(0, new UnregisteredMetricsGroup()));
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
index 9865395..4d1d7e44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
@@ -25,11 +25,20 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
@@ -44,8 +53,14 @@ import org.junit.Test;
import java.io.IOException;
import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertTrue;
/**
@@ -76,6 +91,8 @@ public class AdaptiveSchedulerClusterITCase extends TestLogger {
configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
configuration.set(
JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(100L));
+ // required for #testCheckpointStatsPersistedAcrossRescale
+ configuration.set(WebOptions.CHECKPOINTS_HISTORY_SIZE, Integer.MAX_VALUE);
return configuration;
}
@@ -141,6 +158,93 @@ public class AdaptiveSchedulerClusterITCase extends TestLogger {
assertTrue(jobResultFuture.join().isSuccess());
}
+ @Test
+ public void testCheckpointStatsPersistedAcrossRescale() throws Exception {
+ final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+
+ JobVertex jobVertex = new JobVertex("jobVertex", JOB_VERTEX_ID);
+ jobVertex.setInvokableClass(CheckpointingNoOpInvokable.class);
+ jobVertex.setParallelism(PARALLELISM);
+
+ final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
+ jobGraph.setSnapshotSettings(
+ new JobCheckpointingSettings(
+ CheckpointCoordinatorConfiguration.builder()
+ .setCheckpointInterval(100)
+ .setCheckpointTimeout(1000)
+ .build(),
+ null));
+
+ miniCluster.submitJob(jobGraph).join();
+
+ // wait until some checkpoints have been completed
+ CommonTestUtils.waitUntilCondition(
+ () ->
+ miniCluster
+ .getExecutionGraph(jobGraph.getJobID())
+ .thenApply(
+ eg ->
+ eg.getCheckpointStatsSnapshot()
+ .getCounts()
+ .getNumberOfCompletedCheckpoints()
+ > 0)
+ .get(),
+ Deadline.fromNow(Duration.ofHours(1)));
+
+ miniCluster.terminateTaskManager(0);
+
+ waitUntilParallelismForVertexReached(
+ jobGraph.getJobID(),
+ JOB_VERTEX_ID,
+ NUMBER_SLOTS_PER_TASK_MANAGER * (NUMBER_TASK_MANAGERS - 1));
+
+ // check that the very first checkpoint is still accessible
+ final List<AbstractCheckpointStats> checkpointHistory =
+ miniCluster
+ .getExecutionGraph(jobGraph.getJobID())
+ .thenApply(
+ eg -> eg.getCheckpointStatsSnapshot().getHistory().getCheckpoints())
+ .get();
+ assertThat(checkpointHistory.get(checkpointHistory.size() - 1).getCheckpointId(), is(1L));
+ }
+
+ /** An invokable that doesn't do anything interesting, but does support checkpointing. */
+ public static class CheckpointingNoOpInvokable extends AbstractInvokable {
+
+ private static final long CANCEL_SIGNAL = -2L;
+ private final BlockingQueue<Long> checkpointsToConfirm = new ArrayBlockingQueue<>(1);
+
+ public CheckpointingNoOpInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ long signal = checkpointsToConfirm.take();
+ while (signal != CANCEL_SIGNAL) {
+ getEnvironment().acknowledgeCheckpoint(signal, new CheckpointMetrics());
+ signal = checkpointsToConfirm.take();
+ }
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ checkpointsToConfirm.add(CANCEL_SIGNAL);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
+ checkpointsToConfirm.add(checkpointMetaData.getCheckpointId());
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
private JobGraph createBlockingJobGraph(int parallelism) throws IOException {
final JobVertex blockingOperator = new JobVertex("Blocking operator", JOB_VERTEX_ID);