You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/04 12:34:46 UTC
[flink] branch master updated: [FLINK-24748] Remove
CheckpointStatsTracker#getJobCheckpointingConfiguration
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1b44906 [FLINK-24748] Remove CheckpointStatsTracker#getJobCheckpointingConfiguration
1b44906 is described below
commit 1b4490654e0438332c2a4cac679c05a321d3f34c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Nov 2 16:55:21 2021 +0100
[FLINK-24748] Remove CheckpointStatsTracker#getJobCheckpointingConfiguration
The CheckpointCoordinatorConfiguration was unnecessarily stored in the CheckpointStatsTracker. We can store it directly in the ExecutionGraph instead.
---
.../runtime/checkpoint/CheckpointStatsTracker.java | 20 +--------
.../executiongraph/DefaultExecutionGraph.java | 8 +++-
.../DefaultExecutionGraphBuilder.java | 5 +--
.../checkpoint/CheckpointCoordinatorTest.java | 10 +----
.../checkpoint/CheckpointStatsTrackerTest.java | 52 +++-------------------
5 files changed, 15 insertions(+), 80 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 8c4ac97..8739261 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -24,7 +24,6 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import javax.annotation.Nullable;
@@ -62,9 +61,6 @@ public class CheckpointStatsTracker {
*/
private final ReentrantLock statsReadWriteLock = new ReentrantLock();
- /** Snapshotting settings created from the CheckpointConfig. */
- private final CheckpointCoordinatorConfiguration jobCheckpointingConfiguration;
-
/** Checkpoint counts. */
private final CheckpointStatsCounts counts = new CheckpointStatsCounts();
@@ -94,17 +90,12 @@ public class CheckpointStatsTracker {
*
* @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in
* progress ones.
- * @param jobCheckpointingConfiguration Checkpointing configuration.
* @param metricGroup Metric group for exposed metrics
*/
- public CheckpointStatsTracker(
- int numRememberedCheckpoints,
- CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
- MetricGroup metricGroup) {
+ public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup metricGroup) {
checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints");
this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
- this.jobCheckpointingConfiguration = checkNotNull(jobCheckpointingConfiguration);
// Latest snapshot is empty
latestSnapshot =
@@ -119,15 +110,6 @@ public class CheckpointStatsTracker {
}
/**
- * Returns the job's checkpointing configuration which is derived from the CheckpointConfig.
- *
- * @return The job's checkpointing configuration.
- */
- public CheckpointCoordinatorConfiguration getJobCheckpointingConfiguration() {
- return jobCheckpointingConfiguration;
- }
-
- /**
* Creates a new snapshot of the available stats.
*
* @return The latest statistics snapshot.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 2a494ff..9338261 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -235,6 +235,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
// ------ Fields that are relevant to the execution and need to be cleared before archiving
// -------
+ @Nullable private CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration;
+
/** The coordinator for checkpoints, if snapshot checkpoints are enabled. */
@Nullable private CheckpointCoordinator checkpointCoordinator;
@@ -405,6 +407,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
buildOpCoordinatorCheckpointContexts();
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
+ checkpointCoordinatorConfiguration =
+ checkNotNull(chkConfig, "CheckpointCoordinatorConfiguration");
CheckpointFailureManager failureManager =
new CheckpointFailureManager(
@@ -494,8 +498,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
@Override
public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
- if (checkpointStatsTracker != null) {
- return checkpointStatsTracker.getJobCheckpointingConfiguration();
+ if (checkpointCoordinatorConfiguration != null) {
+ return checkpointCoordinatorConfiguration;
} else {
return null;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 7100f41..4a61dd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -208,10 +208,7 @@ public class DefaultExecutionGraphBuilder {
int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsTracker checkpointStatsTracker =
- new CheckpointStatsTracker(
- historySize,
- snapshotSettings.getCheckpointCoordinatorConfiguration(),
- metrics);
+ new CheckpointStatsTracker(historySize, metrics);
// load the state backend from the application settings
final StateBackend applicationConfiguredBackend;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index e1a87e3..6a792c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -189,10 +189,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex lateReportVertex =
executionGraph.getJobVertex(lateReportVertexID).getTaskVertices()[0];
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(
- Integer.MAX_VALUE,
- CheckpointCoordinatorConfiguration.builder().build(),
- new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
CheckpointCoordinator coordinator =
new CheckpointCoordinatorBuilder()
.setExecutionGraph(executionGraph)
@@ -437,10 +434,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
.build();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(
- Integer.MAX_VALUE,
- CheckpointCoordinatorConfiguration.builder().build(),
- new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
checkpointCoordinator.setCheckpointStatsTracker(statsTracker);
// nothing should be happening
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 0e97958..96bbfa7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -24,8 +24,6 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.junit.Test;
@@ -42,38 +40,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
public class CheckpointStatsTrackerTest {
- /** Tests access to the snapshotting settings. */
- @Test
- public void testGetSnapshottingSettings() throws Exception {
- JobCheckpointingSettings snapshottingSettings =
- new JobCheckpointingSettings(
- new CheckpointCoordinatorConfiguration(
- 181238123L,
- 19191992L,
- 191929L,
- 123,
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- false,
- false,
- 0,
- 0),
- null);
-
- CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(
- 0,
- snapshottingSettings.getCheckpointCoordinatorConfiguration(),
- new UnregisteredMetricsGroup());
-
- assertEquals(
- snapshottingSettings.getCheckpointCoordinatorConfiguration(),
- tracker.getJobCheckpointingConfiguration());
- }
-
/** Tests that the number of remembered checkpoints configuration is respected. */
@Test
public void testTrackerWithoutHistory() throws Exception {
@@ -85,10 +54,7 @@ public class CheckpointStatsTrackerTest {
ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(
- 0,
- mock(CheckpointCoordinatorConfiguration.class),
- new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(0, new UnregisteredMetricsGroup());
PendingCheckpointStats pending =
tracker.reportPendingCheckpoint(
@@ -136,10 +102,7 @@ public class CheckpointStatsTrackerTest {
singletonMap(jobVertexID, jobVertex.getParallelism());
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(
- 10,
- mock(CheckpointCoordinatorConfiguration.class),
- new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
// Completed checkpoint
PendingCheckpointStats completed1 =
@@ -254,10 +217,7 @@ public class CheckpointStatsTrackerTest {
public void testCreateSnapshot() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(
- 10,
- mock(CheckpointCoordinatorConfiguration.class),
- new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
@@ -313,7 +273,7 @@ public class CheckpointStatsTrackerTest {
}
};
- new CheckpointStatsTracker(0, mock(CheckpointCoordinatorConfiguration.class), metricGroup);
+ new CheckpointStatsTracker(0, metricGroup);
// Make sure this test is adjusted when further metrics are added
assertTrue(
@@ -360,9 +320,7 @@ public class CheckpointStatsTrackerTest {
.build();
ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
- CheckpointStatsTracker stats =
- new CheckpointStatsTracker(
- 0, mock(CheckpointCoordinatorConfiguration.class), metricGroup);
+ CheckpointStatsTracker stats = new CheckpointStatsTracker(0, metricGroup);
// Make sure to adjust this test if metrics are added/removed
assertEquals(10, registeredGauges.size());