You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/04/26 10:36:35 UTC
flink git commit: [FLINK-3803] [runtime] Pass CheckpointStatsTracker
to ExecutionGraph
Repository: flink
Updated Branches:
refs/heads/release-1.0 f80f6d602 -> 7062b0ae8
[FLINK-3803] [runtime] Pass CheckpointStatsTracker to ExecutionGraph
Backport of 8330539.
`CheckpointStatsTracker` was instantiated in `ExecutionGraph#enableSnapshotCheckpointing`,
where the Flink configuration is not available to parse the configuration.
Instead of instantiating the `CheckpointStatsTracker` in the `ExecutionGraph`
method, we directly pass it to it.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7062b0ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7062b0ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7062b0ae
Branch: refs/heads/release-1.0
Commit: 7062b0ae85c2885fcbe0f7bd0a9a20049530d1d9
Parents: f80f6d6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Apr 26 10:30:46 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Apr 26 10:31:02 2016 +0200
----------------------------------------------------------------------
.../stats/SimpleCheckpointStatsTracker.java | 19 ++++-----
.../runtime/executiongraph/ExecutionGraph.java | 37 +++++------------
.../flink/runtime/jobmanager/JobManager.scala | 20 ++++++++-
...ExecutionGraphCheckpointCoordinatorTest.java | 4 +-
.../stats/SimpleCheckpointStatsTrackerTest.java | 43 ++++++++++----------
5 files changed, 61 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 5ee4fc3..aea18e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint.stats;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.StateForTask;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import scala.Option;
@@ -108,22 +108,19 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
public SimpleCheckpointStatsTracker(
int historySize,
- ExecutionVertex[] tasksToWaitFor) {
+ List<ExecutionJobVertex> tasksToWaitFor) {
checkArgument(historySize >= 0);
this.historySize = historySize;
- // We know upfront, which tasks will ack the checkpoints.
- if (tasksToWaitFor != null && tasksToWaitFor.length > 0) {
- taskParallelism = new HashMap<>();
+ // We know upfront which tasks will ack the checkpoints
+ if (tasksToWaitFor != null && !tasksToWaitFor.isEmpty()) {
+ taskParallelism = new HashMap<>(tasksToWaitFor.size());
- for (ExecutionVertex vertex : tasksToWaitFor) {
- taskParallelism.put(
- vertex.getJobvertexId(),
- vertex.getTotalNumberOfParallelSubtasks());
+ for (ExecutionJobVertex vertex : tasksToWaitFor) {
+ taskParallelism.put(vertex.getJobVertexId(), vertex.getParallelism());
}
- }
- else {
+ } else {
taskParallelism = Collections.emptyMap();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ed50bea..7cb83cd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,16 +19,15 @@
package org.apache.flink.runtime.executiongraph;
import akka.actor.ActorSystem;
-
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -38,17 +37,14 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.SavepointCoordinator;
import org.apache.flink.runtime.checkpoint.StateStore;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.UnrecoverableException;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
@@ -58,13 +54,11 @@ import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
-
+import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
@@ -73,14 +67,15 @@ import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Collection;
-import java.util.HashSet;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -353,7 +348,8 @@ public class ExecutionGraph implements Serializable {
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
RecoveryMode recoveryMode,
- StateStore<CompletedCheckpoint> savepointStore) throws Exception {
+ StateStore<CompletedCheckpoint> savepointStore,
+ CheckpointStatsTracker statsTracker) throws Exception {
// simple sanity checks
if (interval < 10 || checkpointTimeout < 10) {
@@ -370,20 +366,7 @@ public class ExecutionGraph implements Serializable {
// disable to make sure existing checkpoint coordinators are cleared
disableSnaphotCheckpointing();
- boolean isStatsDisabled = jobConfiguration.getBoolean(
- ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
- ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
-
- if (isStatsDisabled) {
- checkpointStatsTracker = new DisabledCheckpointStatsTracker();
- }
- else {
- int historySize = jobConfiguration.getInteger(
- ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
- ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
-
- checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, tasksToWaitFor);
- }
+ checkpointStatsTracker = Objects.requireNonNull(statsTracker, "Checkpoint stats tracker");
// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator = new CheckpointCoordinator(
http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5858171..cee5606 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.checkpoint._
+import org.apache.flink.runtime.checkpoint.stats.{SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker, CheckpointStatsTracker}
import org.apache.flink.runtime.client._
import org.apache.flink.runtime.execution.UnrecoverableException
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -1057,6 +1058,22 @@ class JobManager(
val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
+ // Checkpoint stats tracker
+ val isStatsDisabled: Boolean = flinkConfiguration.getBoolean(
+ ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE)
+
+ val checkpointStatsTracker: CheckpointStatsTracker =
+ if (isStatsDisabled) {
+ new DisabledCheckpointStatsTracker()
+ } else {
+ val historySize: Int = flinkConfiguration.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
+
+ new SimpleCheckpointStatsTracker(historySize, ackVertices)
+ }
+
executionGraph.enableSnapshotCheckpointing(
snapshotSettings.getCheckpointInterval,
snapshotSettings.getCheckpointTimeout,
@@ -1070,7 +1087,8 @@ class JobManager(
checkpointIdCounter,
completedCheckpoints,
recoveryMode,
- savepointStore)
+ savepointStore,
+ checkpointStatsTracker)
}
// get notified about job status changes
http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 5d83bf2..04dc46d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -70,7 +71,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader()),
RecoveryMode.STANDALONE,
- new HeapStateStore<CompletedCheckpoint>());
+ new HeapStateStore<CompletedCheckpoint>(),
+ new DisabledCheckpointStatsTracker());
CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
SavepointCoordinator savepointCoordinator = executionGraph.getSavepointCoordinator();
http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 56228ef..47d5011 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint.stats;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.StateForTask;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.SerializedValue;
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -46,11 +47,11 @@ import static org.mockito.Mockito.when;
public class SimpleCheckpointStatsTrackerTest {
private static final Random RAND = new Random();
-
+
@Test
public void testNoCompletedCheckpointYet() throws Exception {
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(
- 0, new ExecutionVertex[0]);
+ 0, Collections.<ExecutionJobVertex>emptyList());
assertFalse(tracker.getJobStats().isDefined());
assertFalse(tracker.getOperatorStats(new JobVertexID()).isDefined());
@@ -59,7 +60,7 @@ public class SimpleCheckpointStatsTrackerTest {
@Test
public void testRandomStats() throws Exception {
CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
- ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
+ List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
for (int i = 0; i < checkpoints.length; i++) {
@@ -75,7 +76,7 @@ public class SimpleCheckpointStatsTrackerTest {
@Test
public void testIllegalOperatorId() throws Exception {
CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
- ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
+ List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
for (CompletedCheckpoint checkpoint : checkpoints) {
@@ -90,7 +91,7 @@ public class SimpleCheckpointStatsTrackerTest {
@Test
public void testCompletedCheckpointReordering() throws Exception {
CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
- ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
+ List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
// First the second checkpoint notifies
@@ -110,7 +111,7 @@ public class SimpleCheckpointStatsTrackerTest {
@SuppressWarnings("unchecked")
public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception {
CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
- ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
+ List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
tracker.onCompletedCheckpoint(checkpoints[0]);
@@ -230,12 +231,12 @@ public class SimpleCheckpointStatsTrackerTest {
private static void verifySubtaskStats(
CheckpointStatsTracker tracker,
- ExecutionVertex[] tasksToWaitFor,
+ List<ExecutionJobVertex> tasksToWaitFor,
CompletedCheckpoint checkpoint) {
- for (ExecutionVertex vertex : tasksToWaitFor) {
- JobVertexID operatorId = vertex.getJobvertexId();
- int parallelism = vertex.getTotalNumberOfParallelSubtasks();
+ for (ExecutionJobVertex vertex : tasksToWaitFor) {
+ JobVertexID operatorId = vertex.getJobVertexId();
+ int parallelism = vertex.getParallelism();
OperatorCheckpointStats actualStats = tracker.getOperatorStats(operatorId).get();
@@ -341,7 +342,8 @@ public class SimpleCheckpointStatsTrackerTest {
return checkpoints;
}
- private ExecutionVertex[] createTasksToWaitFor(CompletedCheckpoint checkpoint) {
+ private List<ExecutionJobVertex> createTasksToWaitFor(CompletedCheckpoint checkpoint) {
+
Map<JobVertexID, Integer> operators = new HashMap<>();
for (StateForTask state : checkpoint.getStates()) {
@@ -355,17 +357,14 @@ public class SimpleCheckpointStatsTrackerTest {
}
}
- ExecutionVertex[] tasksToWaitFor = new ExecutionVertex[operators.size()];
-
- int i = 0;
- for (JobVertexID operatorId : operators.keySet()) {
- tasksToWaitFor[i] = mock(ExecutionVertex.class);
- when(tasksToWaitFor[i].getJobvertexId()).thenReturn(operatorId);
- when(tasksToWaitFor[i].getTotalNumberOfParallelSubtasks()).thenReturn(operators.get(operatorId));
-
- i++;
+ List<ExecutionJobVertex> jobVertices = new ArrayList<>(checkpoint.getStates().size());
+ for (JobVertexID vertexId : operators.keySet()) {
+ ExecutionJobVertex v = mock(ExecutionJobVertex.class);
+ when(v.getJobVertexId()).thenReturn(vertexId);
+ when(v.getParallelism()).thenReturn(operators.get(vertexId));
+ jobVertices.add(v);
}
- return tasksToWaitFor;
+ return jobVertices;
}
}