You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/04/26 10:36:35 UTC

flink git commit: [FLINK-3803] [runtime] Pass CheckpointStatsTracker to ExecutionGraph

Repository: flink
Updated Branches:
  refs/heads/release-1.0 f80f6d602 -> 7062b0ae8


[FLINK-3803] [runtime] Pass CheckpointStatsTracker to ExecutionGraph

Backport of 8330539.

`CheckpointStatsTracker` was instantiated in `ExecutionGraph#enableSnapshotCheckpointing`,
where the Flink configuration is not available to parse the configuration.

Instead of instantiating the `CheckpointStatsTracker` in the `ExecutionGraph`
method, we directly pass it to it.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7062b0ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7062b0ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7062b0ae

Branch: refs/heads/release-1.0
Commit: 7062b0ae85c2885fcbe0f7bd0a9a20049530d1d9
Parents: f80f6d6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Apr 26 10:30:46 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Apr 26 10:31:02 2016 +0200

----------------------------------------------------------------------
 .../stats/SimpleCheckpointStatsTracker.java     | 19 ++++-----
 .../runtime/executiongraph/ExecutionGraph.java  | 37 +++++------------
 .../flink/runtime/jobmanager/JobManager.scala   | 20 ++++++++-
 ...ExecutionGraphCheckpointCoordinatorTest.java |  4 +-
 .../stats/SimpleCheckpointStatsTrackerTest.java | 43 ++++++++++----------
 5 files changed, 61 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 5ee4fc3..aea18e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint.stats;
 
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.StateForTask;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import scala.Option;
 
@@ -108,22 +108,19 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 
 	public SimpleCheckpointStatsTracker(
 			int historySize,
-			ExecutionVertex[] tasksToWaitFor) {
+			List<ExecutionJobVertex> tasksToWaitFor) {
 
 		checkArgument(historySize >= 0);
 		this.historySize = historySize;
 
-		// We know upfront, which tasks will ack the checkpoints.
-		if (tasksToWaitFor != null && tasksToWaitFor.length > 0) {
-			taskParallelism = new HashMap<>();
+		// We know upfront which tasks will ack the checkpoints
+		if (tasksToWaitFor != null && !tasksToWaitFor.isEmpty()) {
+			taskParallelism = new HashMap<>(tasksToWaitFor.size());
 
-			for (ExecutionVertex vertex : tasksToWaitFor) {
-				taskParallelism.put(
-						vertex.getJobvertexId(),
-						vertex.getTotalNumberOfParallelSubtasks());
+			for (ExecutionJobVertex vertex : tasksToWaitFor) {
+				taskParallelism.put(vertex.getJobVertexId(), vertex.getParallelism());
 			}
-		}
-		else {
+		} else {
 			taskParallelism = Collections.emptyMap();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ed50bea..7cb83cd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,16 +19,15 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
-
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -38,17 +37,14 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.SavepointCoordinator;
 import org.apache.flink.runtime.checkpoint.StateStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.UnrecoverableException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
@@ -58,13 +54,11 @@ import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
-
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -73,14 +67,15 @@ import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -353,7 +348,8 @@ public class ExecutionGraph implements Serializable {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			RecoveryMode recoveryMode,
-			StateStore<CompletedCheckpoint> savepointStore) throws Exception {
+			StateStore<CompletedCheckpoint> savepointStore,
+			CheckpointStatsTracker statsTracker) throws Exception {
 
 		// simple sanity checks
 		if (interval < 10 || checkpointTimeout < 10) {
@@ -370,20 +366,7 @@ public class ExecutionGraph implements Serializable {
 		// disable to make sure existing checkpoint coordinators are cleared
 		disableSnaphotCheckpointing();
 
-		boolean isStatsDisabled = jobConfiguration.getBoolean(
-				ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
-
-		if (isStatsDisabled) {
-			checkpointStatsTracker = new DisabledCheckpointStatsTracker();
-		}
-		else {
-			int historySize = jobConfiguration.getInteger(
-					ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-					ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
-
-			checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, tasksToWaitFor);
-		}
+		checkpointStatsTracker = Objects.requireNonNull(statsTracker, "Checkpoint stats tracker");
 
 		// create the coordinator that triggers and commits checkpoints and holds the state
 		checkpointCoordinator = new CheckpointCoordinator(

http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5858171..cee5606 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
+import org.apache.flink.runtime.checkpoint.stats.{SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker, CheckpointStatsTracker}
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.UnrecoverableException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -1057,6 +1058,22 @@ class JobManager(
 
           val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
 
+          // Checkpoint stats tracker
+          val isStatsDisabled: Boolean = flinkConfiguration.getBoolean(
+            ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
+            ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE)
+
+          val checkpointStatsTracker: CheckpointStatsTracker =
+            if (isStatsDisabled) {
+              new DisabledCheckpointStatsTracker()
+            } else {
+              val historySize: Int = flinkConfiguration.getInteger(
+                ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+                ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
+
+              new SimpleCheckpointStatsTracker(historySize, ackVertices)
+            }
+
           executionGraph.enableSnapshotCheckpointing(
             snapshotSettings.getCheckpointInterval,
             snapshotSettings.getCheckpointTimeout,
@@ -1070,7 +1087,8 @@ class JobManager(
             checkpointIdCounter,
             completedCheckpoints,
             recoveryMode,
-            savepointStore)
+            savepointStore,
+            checkpointStatsTracker)
         }
 
         // get notified about job status changes

http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 5d83bf2..04dc46d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -70,7 +71,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader()),
 					RecoveryMode.STANDALONE,
-					new HeapStateStore<CompletedCheckpoint>());
+					new HeapStateStore<CompletedCheckpoint>(),
+					new DisabledCheckpointStatsTracker());
 
 			CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
 			SavepointCoordinator savepointCoordinator = executionGraph.getSavepointCoordinator();

http://git-wip-us.apache.org/repos/asf/flink/blob/7062b0ae/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 56228ef..47d5011 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint.stats;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.StateForTask;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,11 +47,11 @@ import static org.mockito.Mockito.when;
 public class SimpleCheckpointStatsTrackerTest {
 
 	private static final Random RAND = new Random();
-	
+
 	@Test
 	public void testNoCompletedCheckpointYet() throws Exception {
 		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(
-				0, new ExecutionVertex[0]);
+				0, Collections.<ExecutionJobVertex>emptyList());
 
 		assertFalse(tracker.getJobStats().isDefined());
 		assertFalse(tracker.getOperatorStats(new JobVertexID()).isDefined());
@@ -59,7 +60,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	@Test
 	public void testRandomStats() throws Exception {
 		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
-		ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
+		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
 		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
 
 		for (int i = 0; i < checkpoints.length; i++) {
@@ -75,7 +76,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	@Test
 	public void testIllegalOperatorId() throws Exception {
 		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
-		ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
+		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
 		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
 
 		for (CompletedCheckpoint checkpoint : checkpoints) {
@@ -90,7 +91,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	@Test
 	public void testCompletedCheckpointReordering() throws Exception {
 		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
-		ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
+		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
 		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
 
 		// First the second checkpoint notifies
@@ -110,7 +111,7 @@ public class SimpleCheckpointStatsTrackerTest {
 	@SuppressWarnings("unchecked")
 	public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception {
 		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
-		ExecutionVertex[] tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
+		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
 		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
 
 		tracker.onCompletedCheckpoint(checkpoints[0]);
@@ -230,12 +231,12 @@ public class SimpleCheckpointStatsTrackerTest {
 
 	private static void verifySubtaskStats(
 			CheckpointStatsTracker tracker,
-			ExecutionVertex[] tasksToWaitFor,
+			List<ExecutionJobVertex> tasksToWaitFor,
 			CompletedCheckpoint checkpoint) {
 
-		for (ExecutionVertex vertex : tasksToWaitFor) {
-			JobVertexID operatorId = vertex.getJobvertexId();
-			int parallelism = vertex.getTotalNumberOfParallelSubtasks();
+		for (ExecutionJobVertex vertex : tasksToWaitFor) {
+			JobVertexID operatorId = vertex.getJobVertexId();
+			int parallelism = vertex.getParallelism();
 
 			OperatorCheckpointStats actualStats = tracker.getOperatorStats(operatorId).get();
 
@@ -341,7 +342,8 @@ public class SimpleCheckpointStatsTrackerTest {
 		return checkpoints;
 	}
 
-	private ExecutionVertex[] createTasksToWaitFor(CompletedCheckpoint checkpoint) {
+	private List<ExecutionJobVertex> createTasksToWaitFor(CompletedCheckpoint checkpoint) {
+
 		Map<JobVertexID, Integer> operators = new HashMap<>();
 
 		for (StateForTask state : checkpoint.getStates()) {
@@ -355,17 +357,14 @@ public class SimpleCheckpointStatsTrackerTest {
 			}
 		}
 
-		ExecutionVertex[] tasksToWaitFor = new ExecutionVertex[operators.size()];
-
-		int i = 0;
-		for (JobVertexID operatorId : operators.keySet()) {
-			tasksToWaitFor[i] = mock(ExecutionVertex.class);
-			when(tasksToWaitFor[i].getJobvertexId()).thenReturn(operatorId);
-			when(tasksToWaitFor[i].getTotalNumberOfParallelSubtasks()).thenReturn(operators.get(operatorId));
-
-			i++;
+		List<ExecutionJobVertex> jobVertices = new ArrayList<>(checkpoint.getStates().size());
+		for (JobVertexID vertexId : operators.keySet()) {
+			ExecutionJobVertex v = mock(ExecutionJobVertex.class);
+			when(v.getJobVertexId()).thenReturn(vertexId);
+			when(v.getParallelism()).thenReturn(operators.get(vertexId));
+			jobVertices.add(v);
 		}
 
-		return tasksToWaitFor;
+		return jobVertices;
 	}
 }