You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/04 12:34:46 UTC

[flink] branch master updated: [FLINK-24748] Remove CheckpointStatsTracker#getJobCheckpointingConfiguration

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b44906  [FLINK-24748] Remove CheckpointStatsTracker#getJobCheckpointingConfiguration
1b44906 is described below

commit 1b4490654e0438332c2a4cac679c05a321d3f34c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Nov 2 16:55:21 2021 +0100

    [FLINK-24748] Remove CheckpointStatsTracker#getJobCheckpointingConfiguration
    
    The CheckpointCoordinatorConfiguration was unnecessarily stored in the CheckpointStatsTracker. We can store it directly in the ExecutionGraph instead.
---
 .../runtime/checkpoint/CheckpointStatsTracker.java | 20 +--------
 .../executiongraph/DefaultExecutionGraph.java      |  8 +++-
 .../DefaultExecutionGraphBuilder.java              |  5 +--
 .../checkpoint/CheckpointCoordinatorTest.java      | 10 +----
 .../checkpoint/CheckpointStatsTrackerTest.java     | 52 +++-------------------
 5 files changed, 15 insertions(+), 80 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 8c4ac97..8739261 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -24,7 +24,6 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 
 import javax.annotation.Nullable;
 
@@ -62,9 +61,6 @@ public class CheckpointStatsTracker {
      */
     private final ReentrantLock statsReadWriteLock = new ReentrantLock();
 
-    /** Snapshotting settings created from the CheckpointConfig. */
-    private final CheckpointCoordinatorConfiguration jobCheckpointingConfiguration;
-
     /** Checkpoint counts. */
     private final CheckpointStatsCounts counts = new CheckpointStatsCounts();
 
@@ -94,17 +90,12 @@ public class CheckpointStatsTracker {
      *
      * @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in
      *     progress ones.
-     * @param jobCheckpointingConfiguration Checkpointing configuration.
      * @param metricGroup Metric group for exposed metrics
      */
-    public CheckpointStatsTracker(
-            int numRememberedCheckpoints,
-            CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
-            MetricGroup metricGroup) {
+    public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup metricGroup) {
 
         checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints");
         this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
-        this.jobCheckpointingConfiguration = checkNotNull(jobCheckpointingConfiguration);
 
         // Latest snapshot is empty
         latestSnapshot =
@@ -119,15 +110,6 @@ public class CheckpointStatsTracker {
     }
 
     /**
-     * Returns the job's checkpointing configuration which is derived from the CheckpointConfig.
-     *
-     * @return The job's checkpointing configuration.
-     */
-    public CheckpointCoordinatorConfiguration getJobCheckpointingConfiguration() {
-        return jobCheckpointingConfiguration;
-    }
-
-    /**
      * Creates a new snapshot of the available stats.
      *
      * @return The latest statistics snapshot.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 2a494ff..9338261 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -235,6 +235,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
     // ------ Fields that are relevant to the execution and need to be cleared before archiving
     // -------
 
+    @Nullable private CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration;
+
     /** The coordinator for checkpoints, if snapshot checkpoints are enabled. */
     @Nullable private CheckpointCoordinator checkpointCoordinator;
 
@@ -405,6 +407,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
                 buildOpCoordinatorCheckpointContexts();
 
         checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
+        checkpointCoordinatorConfiguration =
+                checkNotNull(chkConfig, "CheckpointCoordinatorConfiguration");
 
         CheckpointFailureManager failureManager =
                 new CheckpointFailureManager(
@@ -494,8 +498,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
 
     @Override
     public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
-        if (checkpointStatsTracker != null) {
-            return checkpointStatsTracker.getJobCheckpointingConfiguration();
+        if (checkpointCoordinatorConfiguration != null) {
+            return checkpointCoordinatorConfiguration;
         } else {
             return null;
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 7100f41..4a61dd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -208,10 +208,7 @@ public class DefaultExecutionGraphBuilder {
             int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
 
             CheckpointStatsTracker checkpointStatsTracker =
-                    new CheckpointStatsTracker(
-                            historySize,
-                            snapshotSettings.getCheckpointCoordinatorConfiguration(),
-                            metrics);
+                    new CheckpointStatsTracker(historySize, metrics);
 
             // load the state backend from the application settings
             final StateBackend applicationConfiguredBackend;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index e1a87e3..6a792c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -189,10 +189,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         ExecutionVertex lateReportVertex =
                 executionGraph.getJobVertex(lateReportVertexID).getTaskVertices()[0];
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(
-                        Integer.MAX_VALUE,
-                        CheckpointCoordinatorConfiguration.builder().build(),
-                        new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
         CheckpointCoordinator coordinator =
                 new CheckpointCoordinatorBuilder()
                         .setExecutionGraph(executionGraph)
@@ -437,10 +434,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .build();
 
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(
-                        Integer.MAX_VALUE,
-                        CheckpointCoordinatorConfiguration.builder().build(),
-                        new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup());
         checkpointCoordinator.setCheckpointStatsTracker(statsTracker);
 
         // nothing should be happening
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 0e97958..96bbfa7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -24,8 +24,6 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 
 import org.junit.Test;
 
@@ -42,38 +40,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 public class CheckpointStatsTrackerTest {
 
-    /** Tests access to the snapshotting settings. */
-    @Test
-    public void testGetSnapshottingSettings() throws Exception {
-        JobCheckpointingSettings snapshottingSettings =
-                new JobCheckpointingSettings(
-                        new CheckpointCoordinatorConfiguration(
-                                181238123L,
-                                19191992L,
-                                191929L,
-                                123,
-                                CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                                false,
-                                false,
-                                0,
-                                0),
-                        null);
-
-        CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(
-                        0,
-                        snapshottingSettings.getCheckpointCoordinatorConfiguration(),
-                        new UnregisteredMetricsGroup());
-
-        assertEquals(
-                snapshottingSettings.getCheckpointCoordinatorConfiguration(),
-                tracker.getJobCheckpointingConfiguration());
-    }
-
     /** Tests that the number of remembered checkpoints configuration is respected. */
     @Test
     public void testTrackerWithoutHistory() throws Exception {
@@ -85,10 +54,7 @@ public class CheckpointStatsTrackerTest {
         ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
 
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(
-                        0,
-                        mock(CheckpointCoordinatorConfiguration.class),
-                        new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(0, new UnregisteredMetricsGroup());
 
         PendingCheckpointStats pending =
                 tracker.reportPendingCheckpoint(
@@ -136,10 +102,7 @@ public class CheckpointStatsTrackerTest {
                 singletonMap(jobVertexID, jobVertex.getParallelism());
 
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(
-                        10,
-                        mock(CheckpointCoordinatorConfiguration.class),
-                        new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
 
         // Completed checkpoint
         PendingCheckpointStats completed1 =
@@ -254,10 +217,7 @@ public class CheckpointStatsTrackerTest {
     public void testCreateSnapshot() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(
-                        10,
-                        mock(CheckpointCoordinatorConfiguration.class),
-                        new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
 
         CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
 
@@ -313,7 +273,7 @@ public class CheckpointStatsTrackerTest {
                     }
                 };
 
-        new CheckpointStatsTracker(0, mock(CheckpointCoordinatorConfiguration.class), metricGroup);
+        new CheckpointStatsTracker(0, metricGroup);
 
         // Make sure this test is adjusted when further metrics are added
         assertTrue(
@@ -360,9 +320,7 @@ public class CheckpointStatsTrackerTest {
                         .build();
         ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
 
-        CheckpointStatsTracker stats =
-                new CheckpointStatsTracker(
-                        0, mock(CheckpointCoordinatorConfiguration.class), metricGroup);
+        CheckpointStatsTracker stats = new CheckpointStatsTracker(0, metricGroup);
 
         // Make sure to adjust this test if metrics are added/removed
         assertEquals(10, registeredGauges.size());