You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/10 08:48:47 UTC

[11/11] flink git commit: [FLINK-4410] [runtime] Rework checkpoint stats tracking

[FLINK-4410] [runtime] Rework checkpoint stats tracking


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d1f4bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d1f4bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d1f4bcb

Branch: refs/heads/release-1.2
Commit: 0d1f4bcbb37c5aa18f7bfcb886d3914b2f680bf0
Parents: 6ea77ed
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Dec 23 20:37:08 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Jan 10 09:47:55 2017 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  10 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |   2 +-
 .../checkpoint/AbstractCheckpointStats.java     | 192 ++++++++
 .../checkpoint/CheckpointCoordinator.java       |  43 +-
 .../checkpoint/CheckpointProperties.java        |  59 ++-
 .../checkpoint/CheckpointStatsCounts.java       | 184 ++++++++
 .../checkpoint/CheckpointStatsHistory.java      | 386 ++++++++++++++++
 .../checkpoint/CheckpointStatsSnapshot.java     | 102 +++++
 .../checkpoint/CheckpointStatsStatus.java       |  62 +++
 .../checkpoint/CheckpointStatsTracker.java      | 447 +++++++++++++++++++
 .../runtime/checkpoint/CompletedCheckpoint.java |  18 +
 .../checkpoint/CompletedCheckpointStats.java    | 174 ++++++++
 .../CompletedCheckpointStatsSummary.java        | 107 +++++
 .../checkpoint/FailedCheckpointStats.java       | 153 +++++++
 .../runtime/checkpoint/MinMaxAvgStats.java      | 130 ++++++
 .../runtime/checkpoint/PendingCheckpoint.java   |  97 +++-
 .../checkpoint/PendingCheckpointStats.java      | 190 ++++++++
 .../checkpoint/RestoredCheckpointStats.java     | 103 +++++
 .../flink/runtime/checkpoint/SubtaskState.java  |  37 +-
 .../runtime/checkpoint/SubtaskStateStats.java   | 176 ++++++++
 .../runtime/checkpoint/TaskStateStats.java      | 277 ++++++++++++
 .../savepoint/SavepointV1Serializer.java        |  10 +-
 .../executiongraph/AccessExecutionGraph.java    |   2 +-
 .../AccessExecutionJobVertex.java               |  10 +-
 .../executiongraph/ArchivedExecutionGraph.java  |   7 +-
 .../ArchivedExecutionJobVertex.java             |  14 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  32 +-
 .../executiongraph/ExecutionGraphBuilder.java   |  30 +-
 .../executiongraph/ExecutionJobVertex.java      |  13 -
 .../jobgraph/tasks/JobSnapshottingSettings.java |  17 +-
 .../checkpoint/AcknowledgeCheckpoint.java       |  17 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 182 +++++---
 .../checkpoint/CheckpointPropertiesTest.java    |  27 ++
 .../checkpoint/CheckpointStateRestoreTest.java  |   6 +-
 .../checkpoint/CheckpointStatsCountsTest.java   | 153 +++++++
 .../checkpoint/CheckpointStatsHistoryTest.java  | 196 ++++++++
 .../checkpoint/CheckpointStatsStatusTest.java   |  48 ++
 .../checkpoint/CheckpointStatsTrackerTest.java  | 327 ++++++++++++++
 .../CompletedCheckpointStatsSummaryTest.java    | 105 +++++
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/CompletedCheckpointTest.java     |  25 ++
 .../checkpoint/CoordinatorShutdownTest.java     |   4 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   5 +-
 .../checkpoint/FailedCheckpointStatsTest.java   |  60 +++
 .../runtime/checkpoint/MinMaxAvgStatsTest.java  |  96 ++++
 .../checkpoint/PendingCheckpointStatsTest.java  | 256 +++++++++++
 .../checkpoint/PendingCheckpointTest.java       |  70 ++-
 .../checkpoint/RestoredCheckpointStatsTest.java |  49 ++
 .../checkpoint/SubtaskStateStatsTest.java       |  57 +++
 .../runtime/checkpoint/TaskStateStatsTest.java  |  93 ++++
 .../checkpoint/savepoint/SavepointV1Test.java   |   3 +-
 .../ArchivedExecutionGraphTest.java             | 146 ++----
 .../jobmanager/JobManagerHARecoveryTest.java    |   5 +-
 .../runtime/jobmanager/JobManagerTest.java      |  12 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |  15 +-
 .../messages/CheckpointMessagesTest.java        |   3 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  21 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  17 +-
 58 files changed, 4679 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 8a9d594..eabb754 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -588,7 +588,12 @@ public final class ConfigConstants {
 	/** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */
 	public static final String JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY = "jobmanager.web.submit.enable";
 
-	/** Flag to disable checkpoint stats. */
+	/**
+	 * Flag to disable checkpoint stats.
+	 *
+	 * @deprecated Not possible to disable any longer. Use history size of 0.
+	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = "jobmanager.web.checkpoints.disable";
 
 	/** Config parameter defining the number of checkpoints to remember for recent history. */
@@ -1226,7 +1231,8 @@ public final class ConfigConstants {
 	/** By default, submitting jobs from the web-frontend is allowed. */
 	public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true;
 
-	/** Default flag to disable checkpoint stats. */
+	/** Config key has been deprecated. Therefore, no default value required. */
+	@Deprecated
 	public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false;
 
 	/** Default number of checkpoints to remember for recent history. */

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 3ae224f..b3b7dfc 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -74,7 +74,7 @@ public class JMXJobManagerMetricTest {
 				Collections.<JobVertexID>emptyList(),
 				Collections.<JobVertexID>emptyList(),
 				Collections.<JobVertexID>emptyList(),
-				500, 500, 50, 5, ExternalizedCheckpointSettings.none()));
+				500, 500, 50, 5, ExternalizedCheckpointSettings.none(), true));
 
 			flink.waitForActorsToBeAlive();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
new file mode 100644
index 0000000..6c261a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for checkpoint statistics.
+ */
+public abstract class AbstractCheckpointStats {
+
+	/** ID of this checkpoint. */
+	final long checkpointId;
+
+	/** Timestamp when the checkpoint was triggered at the coordinator. */
+	final long triggerTimestamp;
+
+	/** {@link TaskStateStats} accessible by their ID. */
+	final Map<JobVertexID, TaskStateStats> taskStats;
+
+	/** Total number of subtasks over all tasks. */
+	final int numberOfSubtasks;
+
+	/** Properties of the checkpoint. */
+	final CheckpointProperties props;
+
+	AbstractCheckpointStats(
+			long checkpointId,
+			long triggerTimestamp,
+			CheckpointProperties props,
+			int numberOfSubtasks,
+			Map<JobVertexID, TaskStateStats> taskStats) {
+
+		this.checkpointId = checkpointId;
+		this.triggerTimestamp = triggerTimestamp;
+		this.taskStats = checkNotNull(taskStats);
+		checkArgument(taskStats.size() > 0, "Empty task stats");
+		checkArgument(numberOfSubtasks > 0, "Non-positive number of subtasks");
+		this.numberOfSubtasks = numberOfSubtasks;
+		this.props = checkNotNull(props);
+	}
+
+	/**
+	 * Returns the status of this checkpoint.
+	 *
+	 * @return Status of this checkpoint
+	 */
+	public abstract CheckpointStatsStatus getStatus();
+
+	/**
+	 * Returns the number of acknowledged subtasks.
+	 *
+	 * @return The number of acknowledged subtasks.
+	 */
+	public abstract int getNumberOfAcknowledgedSubtasks();
+
+	/**
+	 * Returns the total checkpoint state size over all subtasks.
+	 *
+	 * @return Total checkpoint state size over all subtasks.
+	 */
+	public abstract long getStateSize();
+
+	/**
+	 * Returns the total buffered bytes during alignment over all subtasks.
+	 *
+	 * <p>Can return <code>-1</code> if the runtime did not report this.
+	 *
+	 * @return Total buffered bytes during alignment over all subtasks.
+	 */
+	public abstract long getAlignmentBuffered();
+
+	/**
+	 * Returns the latest acknowledged subtask stats or <code>null</code> if
+	 * none was acknowledged yet.
+	 *
+	 * @return Latest acknowledged subtask stats or <code>null</code>
+	 */
+	@Nullable
+	public abstract SubtaskStateStats getLatestAcknowledgedSubtaskStats();
+
+	/**
+	 * Returns the ID of this checkpoint.
+	 *
+	 * @return ID of this checkpoint.
+	 */
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	/**
+	 * Returns the timestamp when the checkpoint was triggered.
+	 *
+	 * @return Timestamp when the checkpoint was triggered.
+	 */
+	public long getTriggerTimestamp() {
+		return triggerTimestamp;
+	}
+
+	/**
+	 * Returns the properties of this checkpoint.
+	 *
+	 * @return Properties of this checkpoint.
+	 */
+	public CheckpointProperties getProperties() {
+		return props;
+	}
+
+	/**
+	 * Returns the total number of subtasks involved in this checkpoint.
+	 *
+	 * @return Total number of subtasks involved in this checkpoint.
+	 */
+	public int getNumberOfSubtasks() {
+		return numberOfSubtasks;
+	}
+
+	/**
+	 * Returns the task state stats for the given job vertex ID or
+	 * <code>null</code> if no task with such an ID is available.
+	 *
+	 * @param jobVertexId Job vertex ID of the task stats to look up.
+	 * @return The task state stats instance for the given ID or <code>null</code>.
+	 */
+	public TaskStateStats getTaskStateStats(JobVertexID jobVertexId) {
+		return taskStats.get(jobVertexId);
+	}
+
+	/**
+	 * Returns all task state stats instances.
+	 *
+	 * @return All task state stats instances.
+	 */
+	public Collection<TaskStateStats> getAllTaskStateStats() {
+		return taskStats.values();
+	}
+
+	/**
+	 * Returns the ack timestamp of the latest acknowledged subtask or
+	 * <code>-1</code> if none was acknowledged yet.
+	 *
+	 * @return Ack timestamp of the latest acknowledged subtask or <code>-1</code>.
+	 */
+	public long getLatestAckTimestamp() {
+		SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
+		if (subtask != null) {
+			return subtask.getAckTimestamp();
+		} else {
+			return -1;
+		}
+	}
+
+	/**
+	 * Returns the duration of this checkpoint calculated as the time since
+	 * triggering until the latest acknowledged subtask or <code>-1</code> if
+	 * no subtask was acknowledged yet.
+	 *
+	 * @return Duration of this checkpoint or <code>-1</code> if no subtask was acknowledged yet.
+	 */
+	public long getEndToEndDuration() {
+		SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
+		if (subtask != null) {
+			return Math.max(0, subtask.getAckTimestamp() - triggerTimestamp);
+		} else {
+			return -1;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3ce7a5a..9132897 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -40,6 +39,7 @@ import org.apache.flink.runtime.state.TaskStateHandles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -147,8 +147,9 @@ public class CheckpointCoordinator {
 	/** Flag marking the coordinator as shut down (not accepting any messages any more) */
 	private volatile boolean shutdown;
 
-	/** Helper for tracking checkpoint statistics  */
-	private final CheckpointStatsTracker statsTracker;
+	/** Optional tracker for checkpoint statistics. */
+	@Nullable
+	private CheckpointStatsTracker statsTracker;
 
 	/** Default checkpoint properties **/
 	private final CheckpointProperties checkpointProperties;
@@ -170,7 +171,6 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			String checkpointDirectory,
-			CheckpointStatsTracker statsTracker,
 			Executor executor) {
 
 		// sanity checks
@@ -209,7 +209,6 @@ public class CheckpointCoordinator {
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.checkpointDirectory = checkpointDirectory;
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
-		this.statsTracker = checkNotNull(statsTracker);
 
 		this.timer = new Timer("Checkpoint Timer", true);
 
@@ -231,6 +230,15 @@ public class CheckpointCoordinator {
 		this.executor = checkNotNull(executor);
 	}
 
+	/**
+	 * Sets the checkpoint stats tracker.
+	 *
+	 * @param statsTracker The checkpoint stats tracker.
+	 */
+	public void setCheckpointStatsTracker(@Nullable CheckpointStatsTracker statsTracker) {
+		this.statsTracker = statsTracker;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Clean shutdown
 	// --------------------------------------------------------------------------------------------
@@ -428,11 +436,19 @@ public class CheckpointCoordinator {
 				checkpointID,
 				timestamp,
 				ackTasks,
-				isPeriodic,
 				props,
 				targetDirectory,
 				executor);
 
+			if (statsTracker != null) {
+				PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
+					checkpointID,
+					timestamp,
+					props);
+
+				checkpoint.setStatsCallback(callback);
+			}
+
 			// schedule the timer that will clean up the expired checkpoints
 			TimerTask canceller = new TimerTask() {
 				@Override
@@ -632,7 +648,7 @@ public class CheckpointCoordinator {
 
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 
-				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
+				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetaData())) {
 					case SUCCESS:
 						LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
 							checkpointId, message.getTaskExecutionId(), message.getJob());
@@ -769,8 +785,6 @@ public class CheckpointCoordinator {
 				ee.notifyCheckpointComplete(checkpointId, timestamp);
 			}
 		}
-
-		statsTracker.onCompletedCheckpoint(completedCheckpoint);
 	}
 
 	private void rememberRecentCheckpointId(long id) {
@@ -876,6 +890,17 @@ public class CheckpointCoordinator {
 
 			stateAssignmentOperation.assignStates();
 
+			if (statsTracker != null) {
+				long restoreTimestamp = System.currentTimeMillis();
+				RestoredCheckpointStats restored = new RestoredCheckpointStats(
+					latest.getCheckpointID(),
+					latest.getProperties(),
+					restoreTimestamp,
+					latest.getExternalPath());
+
+				statsTracker.reportRestoredCheckpoint(restored);
+			}
+
 			return true;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 68a4998..4d8bab2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -179,7 +179,6 @@ public class CheckpointProperties implements Serializable {
 
 	// ------------------------------------------------------------------------
 
-
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -227,6 +226,42 @@ public class CheckpointProperties implements Serializable {
 
 	// ------------------------------------------------------------------------
 
+	private static final CheckpointProperties STANDARD_SAVEPOINT = new CheckpointProperties(
+			true,
+			true,
+			false,
+			false,
+			false,
+			false,
+			false);
+
+	private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties(
+			false,
+			false,
+			true,
+			true,
+			true,
+			true,
+			true);
+
+	private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
+			false,
+			true,
+			true,
+			true,
+			false, // Retain on cancellation
+			false,
+			false); // Retain on suspension
+
+	private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
+			false,
+			true,
+			true,
+			true,
+			true, // Delete on cancellation
+			false,
+			true); // Delete on suspension
+
 	/**
 	 * Creates the checkpoint properties for a (manually triggered) savepoint.
 	 *
@@ -236,7 +271,7 @@ public class CheckpointProperties implements Serializable {
 	 * @return Checkpoint properties for a (manually triggered) savepoint.
 	 */
 	public static CheckpointProperties forStandardSavepoint() {
-		return new CheckpointProperties(true, true, false, false, false, false, false);
+		return STANDARD_SAVEPOINT;
 	}
 
 	/**
@@ -248,7 +283,7 @@ public class CheckpointProperties implements Serializable {
 	 * @return Checkpoint properties for a regular checkpoint.
 	 */
 	public static CheckpointProperties forStandardCheckpoint() {
-		return new CheckpointProperties(false, false, true, true, true, true, true);
+		return STANDARD_CHECKPOINT;
 	}
 
 	/**
@@ -264,8 +299,20 @@ public class CheckpointProperties implements Serializable {
 	 * @return Checkpoint properties for an external checkpoint.
 	 */
 	public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) {
-		// Handle suspension like cancellation as graceful cluster shut down
-		// suspends all jobs (non-HA).
-		return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, deleteOnCancellation);
+		if (deleteOnCancellation) {
+			return EXTERNALIZED_CHECKPOINT_DELETED;
+		} else {
+			return EXTERNALIZED_CHECKPOINT_RETAINED;
+		}
+	}
+
+	/**
+	 * Returns whether the checkpoint properties describe a standard savepoint.
+	 *
+	 * @param props Checkpoint properties to check.
+	 * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
+	 */
+	public static boolean isSavepoint(CheckpointProperties props) {
+		return STANDARD_SAVEPOINT.equals(props);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
new file mode 100644
index 0000000..dad45eb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Counts of checkpoints.
+ */
+public class CheckpointStatsCounts implements Serializable {
+
+	private static final long serialVersionUID = -5229425063269482528L;
+
+	/** Number of restored checkpoints. */
+	private long numRestoredCheckpoints;
+
+	/** Number of total checkpoints (in progress, completed, failed). */
+	private long numTotalCheckpoints;
+
+	/** Number of in progress checkpoints. */
+	private int numInProgressCheckpoints;
+
+	/** Number of successfully completed checkpoints. */
+	private long numCompletedCheckpoints;
+
+	/** Number of failed checkpoints. */
+	private long numFailedCheckpoints;
+
+	/**
+	 * Creates the initial zero checkpoint counts.
+	 */
+	CheckpointStatsCounts() {
+		this(0, 0, 0, 0, 0);
+	}
+
+	/**
+	 * Creates the checkpoint counts with the given counts.
+	 *
+	 * @param numRestoredCheckpoints Number of restored checkpoints.
+	 * @param numTotalCheckpoints Number of total checkpoints (in progress, completed, failed).
+	 * @param numInProgressCheckpoints Number of in progress checkpoints.
+	 * @param numCompletedCheckpoints Number of successfully completed checkpoints.
+	 * @param numFailedCheckpoints Number of failed checkpoints.
+	 */
+	private CheckpointStatsCounts(
+			long numRestoredCheckpoints,
+			long numTotalCheckpoints,
+			int numInProgressCheckpoints,
+			long numCompletedCheckpoints,
+			long numFailedCheckpoints) {
+
+		checkArgument(numRestoredCheckpoints >= 0, "Negative number of restored checkpoints");
+		checkArgument(numTotalCheckpoints >= 0, "Negative total number of checkpoints");
+		checkArgument(numInProgressCheckpoints >= 0, "Negative number of in progress checkpoints");
+		checkArgument(numCompletedCheckpoints >= 0, "Negative number of completed checkpoints");
+		checkArgument(numFailedCheckpoints >= 0, "Negative number of failed checkpoints");
+
+		this.numRestoredCheckpoints = numRestoredCheckpoints;
+		this.numTotalCheckpoints = numTotalCheckpoints;
+		this.numInProgressCheckpoints = numInProgressCheckpoints;
+		this.numCompletedCheckpoints = numCompletedCheckpoints;
+		this.numFailedCheckpoints = numFailedCheckpoints;
+	}
+
+	/**
+	 * Returns the number of restored checkpoints.
+	 *
+	 * @return Number of restored checkpoints.
+	 */
+	public long getNumberOfRestoredCheckpoints() {
+		return numRestoredCheckpoints;
+	}
+
+	/**
+	 * Returns the total number of checkpoints (in progress, completed, failed).
+	 *
+	 * @return Total number of checkpoints.
+	 */
+	public long getTotalNumberOfCheckpoints() {
+		return numTotalCheckpoints;
+	}
+
+	/**
+	 * Returns the number of in progress checkpoints.
+	 *
+	 * @return Number of in progress checkpoints.
+	 */
+	public int getNumberOfInProgressCheckpoints() {
+		return numInProgressCheckpoints;
+	}
+
+	/**
+	 * Returns the number of completed checkpoints.
+	 *
+	 * @return Number of completed checkpoints.
+	 */
+	public long getNumberOfCompletedCheckpoints() {
+		return numCompletedCheckpoints;
+	}
+
+	/**
+	 * Returns the number of failed checkpoints.
+	 *
+	 * @return Number of failed checkpoints.
+	 */
+	public long getNumberOfFailedCheckpoints() {
+		return numFailedCheckpoints;
+	}
+
+	/**
+	 * Increments the number of restored checkpoints.
+	 */
+	void incrementRestoredCheckpoints() {
+		numRestoredCheckpoints++;
+	}
+
+	/**
+	 * Increments the number of total and in progress checkpoints.
+	 */
+	void incrementInProgressCheckpoints() {
+		numInProgressCheckpoints++;
+		numTotalCheckpoints++;
+	}
+
+	/**
+	 * Increments the number of successfully completed checkpoints.
+	 *
+	 * <p>It is expected that this follows a previous call to
+	 * {@link #incrementInProgressCheckpoints()}.
+	 */
+	void incrementCompletedCheckpoints() {
+		if (--numInProgressCheckpoints < 0) {
+			throw new IllegalStateException("Incremented the completed number of checkpoints " +
+				"without incrementing the in progress checkpoints before.");
+		}
+		numCompletedCheckpoints++;
+	}
+
+	/**
+	 * Increments the number of failed checkpoints.
+	 *
+	 * <p>It is expected that this follows a previous call to
+	 * {@link #incrementInProgressCheckpoints()}.
+	 */
+	void incrementFailedCheckpoints() {
+		if (--numInProgressCheckpoints < 0) {
+			throw new IllegalStateException("Incremented the completed number of checkpoints " +
+				"without incrementing the in progress checkpoints before.");
+		}
+		numFailedCheckpoints++;
+	}
+
+	/**
+	 * Creates a snapshot of the current state.
+	 *
+	 * @return Snapshot of the current state.
+	 */
+	CheckpointStatsCounts createSnapshot() {
+		return new CheckpointStatsCounts(
+			numRestoredCheckpoints,
+			numTotalCheckpoints,
+			numInProgressCheckpoints,
+			numCompletedCheckpoints,
+			numFailedCheckpoints);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
new file mode 100644
index 0000000..56fc9c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An array based history of checkpoint stats.
+ *
+ * <p>The size of the array is constrained by the maximum allowed size. The
+ * array starts empty an grows with each added checkpoint until it reaches
+ * the maximum number of elements. At this point, the elements wrap around
+ * and the least recently added entry is overwritten.
+ *
+ * <p>Access happens via an checkpointsIterable over the statistics and a map that
+ * exposes the checkpoint by their ID. Both of these are only guaranteed
+ * to reflect the latest state after a call to {@link #createSnapshot()}.
+ *
+ * <p>Furthermore the history tracks the latest completed and latest failed
+ * checkpoint as well as the latest savepoint.
+ */
+public class CheckpointStatsHistory implements Serializable {
+
+	private static final long serialVersionUID = 7090320677606528415L;
+
+	/** Iterable over all available stats. Only updated on {@link #createSnapshot()}. */
+	private final Iterable<AbstractCheckpointStats> checkpointsIterable;
+
+	/** Map of all available stats keyed by their ID. Only updated on {@link #createSnapshot()}. */
+	private final Map<Long, AbstractCheckpointStats> checkpointsById;
+
+	/** Maximum array size. */
+	private final int maxSize;
+
+	/** Flag indicating whether this the history is read-only. */
+	private final boolean readOnly;
+
+	/** Array of checkpointsArray. Writes go aginst this array. */
+	private transient AbstractCheckpointStats[] checkpointsArray;
+
+	/** Next position in {@link #checkpointsArray} to write to. */
+	private transient int nextPos;
+
+	/** The latest successfully completed checkpoint. */
+	@Nullable
+	private CompletedCheckpointStats latestCompletedCheckpoint;
+
+	/** The latest failed checkpoint. */
+	@Nullable
+	private FailedCheckpointStats latestFailedCheckpoint;
+
+	/** The latest successfully completed savepoint. */
+	@Nullable
+	private CompletedCheckpointStats latestSavepoint;
+
+	/**
+	 * Creates a writeable checkpoint history with the given maximum size.
+	 *
+	 * <p>The read views are only updated on calls to {@link #createSnapshot()}.
+	 * Initially they are empty.
+	 *
+	 * @param maxSize Maximum history size.
+	 */
+	CheckpointStatsHistory(int maxSize) {
+		this(
+			false,
+			maxSize,
+			new AbstractCheckpointStats[0],
+			Collections.<AbstractCheckpointStats>emptyList(),
+			Collections.<Long, AbstractCheckpointStats>emptyMap(),
+			null,
+			null,
+			null);
+	}
+
+	/**
+	 * Creates a checkpoint history with the given maximum size and state.
+	 *
+	 * <p>The read views are only updated on calls to {@link #createSnapshot()}.
+	 * Initially they are empty.
+	 *
+	 * @param readOnly Flag indicating whether the history is read-only.
+	 * @param maxSize Maximum history size.
+	 * @param checkpointsIterable Checkpoints iterable.
+	 * @param checkpointsById Checkpoints by ID.
+	 */
+	private CheckpointStatsHistory(
+			boolean readOnly,
+			int maxSize,
+			AbstractCheckpointStats[] checkpointArray,
+			Iterable<AbstractCheckpointStats> checkpointsIterable,
+			Map<Long, AbstractCheckpointStats> checkpointsById,
+			CompletedCheckpointStats latestCompletedCheckpoint,
+			FailedCheckpointStats latestFailedCheckpoint,
+			CompletedCheckpointStats latestSavepoint) {
+
+		this.readOnly = readOnly;
+		checkArgument(maxSize >= 0, "Negative maximum size");
+		this.maxSize = maxSize;
+		this.checkpointsArray = checkpointArray;
+		this.checkpointsIterable = checkNotNull(checkpointsIterable);
+		this.checkpointsById = checkNotNull(checkpointsById);
+		this.latestCompletedCheckpoint = latestCompletedCheckpoint;
+		this.latestFailedCheckpoint = latestFailedCheckpoint;
+		this.latestSavepoint = latestSavepoint;
+	}
+
+	public Iterable<AbstractCheckpointStats> getCheckpoints() {
+		return checkpointsIterable;
+	}
+
+	public AbstractCheckpointStats getCheckpointById(long checkpointId) {
+		return checkpointsById.get(checkpointId);
+	}
+
+	@Nullable
+	public CompletedCheckpointStats getLatestCompletedCheckpoint() {
+		return latestCompletedCheckpoint;
+	}
+
+	@Nullable
+	public FailedCheckpointStats getLatestFailedCheckpoint() {
+		return latestFailedCheckpoint;
+	}
+
+	@Nullable
+	public CompletedCheckpointStats getLatestSavepoint() {
+		return latestSavepoint;
+	}
+
+	/**
+	 * Creates a snapshot of the current state.
+	 *
+	 * @return Snapshot of the current state.
+	 */
+	CheckpointStatsHistory createSnapshot() {
+		if (readOnly) {
+			throw new UnsupportedOperationException("Can't create a snapshot of a read-only history.");
+		}
+
+		Iterable<AbstractCheckpointStats> checkpointsIterable;
+		Map<Long, AbstractCheckpointStats> checkpointsById;
+
+		checkpointsById = new HashMap<>(checkpointsArray.length);
+
+		if (maxSize == 0) {
+			checkpointsIterable = Collections.emptyList();
+		} else {
+			// Create snapshot iterator (copies the array)
+			checkpointsIterable = new CheckpointsStatsHistoryIterable(checkpointsArray, nextPos);
+
+			for (AbstractCheckpointStats checkpoint : checkpointsIterable) {
+				checkpointsById.put(checkpoint.getCheckpointId(), checkpoint);
+			}
+		}
+
+		if (latestCompletedCheckpoint != null) {
+			checkpointsById.put(latestCompletedCheckpoint.getCheckpointId(), latestCompletedCheckpoint);
+		}
+
+		if (latestFailedCheckpoint != null) {
+			checkpointsById.put(latestFailedCheckpoint.getCheckpointId(), latestFailedCheckpoint);
+		}
+
+		if (latestSavepoint != null) {
+			checkpointsById.put(latestSavepoint.getCheckpointId(), latestSavepoint);
+		}
+
+		return new CheckpointStatsHistory(
+			true,
+			maxSize,
+			null,
+			checkpointsIterable,
+			checkpointsById,
+			latestCompletedCheckpoint,
+			latestFailedCheckpoint,
+			latestSavepoint);
+	}
+
+	/**
+	 * Adds an in progress checkpoint to the checkpoint history.
+	 *
+	 * @param pending In progress checkpoint to add.
+	 */
+	void addInProgressCheckpoint(PendingCheckpointStats pending) {
+		if (readOnly) {
+			throw new UnsupportedOperationException("Can't create a snapshot of a read-only history.");
+		}
+
+		if (maxSize == 0) {
+			return;
+		}
+
+		checkNotNull(pending, "Pending checkpoint");
+
+		// Grow the array if required. This happens only for the first entries
+		// and makes the iterator logic easier, because we don't have any
+		// null elements with the growing array.
+		if (checkpointsArray.length < maxSize) {
+			checkpointsArray = Arrays.copyOf(checkpointsArray, checkpointsArray.length + 1);
+		}
+
+		// Wrap around if we are at the end. The next pos is the least recently
+		// added checkpoint.
+		if (nextPos == checkpointsArray.length) {
+			nextPos = 0;
+		}
+
+		checkpointsArray[nextPos++] = pending;
+	}
+
+	/**
+	 * Searches for the in progress checkpoint with the given ID and replaces
+	 * it with the given completed or failed checkpoint.
+	 *
+	 * <p>This is bounded by the maximum number of concurrent in progress
+	 * checkpointsArray, which means that the runtime of this is constant.
+	 *
+	 * @param completedOrFailed The completed or failed checkpoint to replace the in progress checkpoint with.
+	 * @return <code>true</code> if the checkpoint was replaced or <code>false</code> otherwise.
+	 */
+	boolean replacePendingCheckpointById(AbstractCheckpointStats completedOrFailed) {
+		checkArgument(!completedOrFailed.getStatus().isInProgress(), "Not allowed to replace with in progress checkpoints.");
+
+		if (readOnly) {
+			throw new UnsupportedOperationException("Can't create a snapshot of a read-only history.");
+		}
+
+		// Update the latest checkpoint stats
+		if (completedOrFailed.getStatus().isCompleted()) {
+			CompletedCheckpointStats completed = (CompletedCheckpointStats) completedOrFailed;
+			if (CheckpointProperties.isSavepoint(completed.getProperties()) &&
+				(latestSavepoint == null ||
+					completed.getCheckpointId() > latestSavepoint.getCheckpointId())) {
+
+				latestSavepoint = completed;
+			} else if (latestCompletedCheckpoint == null ||
+				completed.getCheckpointId() > latestCompletedCheckpoint.getCheckpointId()) {
+
+				latestCompletedCheckpoint = completed;
+			}
+		} else if (completedOrFailed.getStatus().isFailed()) {
+			FailedCheckpointStats failed = (FailedCheckpointStats) completedOrFailed;
+			if (latestFailedCheckpoint == null ||
+				failed.getCheckpointId() > latestFailedCheckpoint.getCheckpointId()) {
+
+				latestFailedCheckpoint = failed;
+			}
+		}
+
+		if (maxSize == 0) {
+			return false;
+		}
+
+		long checkpointId = completedOrFailed.getCheckpointId();
+
+		// We start searching from the last inserted position. Since the entries
+		// wrap around the array we search until we are at index 0 and then from
+		// the end of the array until (start pos + 1).
+		int startPos = nextPos == checkpointsArray.length ? checkpointsArray.length - 1 : nextPos - 1;
+
+		for (int i = startPos; i >= 0; i--) {
+			if (checkpointsArray[i].getCheckpointId() == checkpointId) {
+				checkpointsArray[i] = completedOrFailed;
+				return true;
+			}
+		}
+
+		for (int i = checkpointsArray.length - 1; i > startPos; i--) {
+			if (checkpointsArray[i].getCheckpointId() == checkpointId) {
+				checkpointsArray[i] = completedOrFailed;
+				return true;
+			}
+		}
+
+		return false;
+	}
+
+	/**
+	 * Iterable over the current checkpoint history.
+	 *
+	 * <p>The iteration order is in reverse insertion order.
+	 */
+	private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats> {
+
+		/** Copy of the checkpointsArray array at the point when this iterable was created. */
+		private final AbstractCheckpointStats[] checkpointsArray;
+
+		/** The starting position from which to iterate over the array. */
+		private final int startPos;
+
+		/**
+		 * Creates the iterable by creating a copy of the checkpoints array.
+		 *
+		 * @param checkpointsArray Checkpoints to iterate over. This array is copied.
+		 * @param nextPos The next write position for the array
+		 */
+		CheckpointsStatsHistoryIterable(AbstractCheckpointStats[] checkpointsArray, int nextPos) {
+			// Copy the array
+			this.checkpointsArray = Arrays.copyOf(checkpointsArray, checkpointsArray.length);
+
+			// Start from nextPos, because that's were the oldest element is
+			this.startPos = nextPos == checkpointsArray.length ? checkpointsArray.length - 1 : nextPos - 1;
+		}
+
+		@Override
+		public Iterator<AbstractCheckpointStats> iterator() {
+			return new CheckpointsSnapshotIterator();
+		}
+
+		/**
+		 * Iterator over the checkpoints array.
+		 */
+		private class CheckpointsSnapshotIterator implements Iterator<AbstractCheckpointStats> {
+
+			/** The current position. */
+			private int currentPos;
+
+			/** The remaining number of elements to iterate over. */
+			private int remaining;
+
+			/**
+			 * Creates the iterator.
+			 */
+			CheckpointsSnapshotIterator() {
+				this.currentPos = startPos;
+				this.remaining = checkpointsArray.length;
+			}
+
+			@Override
+			public boolean hasNext() {
+				return remaining > 0;
+			}
+
+			@Override
+			public AbstractCheckpointStats next() {
+				if (hasNext()) {
+					AbstractCheckpointStats stats = checkpointsArray[currentPos--];
+
+					// Wrap around if needed
+					if (currentPos == -1) {
+						currentPos = checkpointsArray.length - 1;
+					}
+
+					remaining--;
+
+					return stats;
+				} else {
+					throw new NoSuchElementException();
+				}
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
new file mode 100644
index 0000000..e0bfed7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A snapshot of the checkpoint stats.
+ */
+public class CheckpointStatsSnapshot implements Serializable {
+
+	private static final long serialVersionUID = 8914278419087217964L;
+
+	/** Snapshot of the checkpoint counts. */
+	private final CheckpointStatsCounts counts;
+
+	/** Snapshot of the completed checkpoints summary stats. */
+	private final CompletedCheckpointStatsSummary summary;
+
+	/** Snapshot of the checkpoint history. */
+	private final CheckpointStatsHistory history;
+
+	/** The latest restored checkpoint operation. */
+	@Nullable
+	private final RestoredCheckpointStats latestRestoredCheckpoint;
+
+	/**
+	 * Creates a stats snapshot.
+	 *
+	 * @param counts Snapshot of the checkpoint counts.
+	 * @param summary Snapshot of the completed checkpoints summary stats.
+	 * @param history Snapshot of the checkpoint history.
+	 * @param latestRestoredCheckpoint The latest restored checkpoint operation.
+	 */
+	CheckpointStatsSnapshot(
+			CheckpointStatsCounts counts,
+			CompletedCheckpointStatsSummary summary,
+			CheckpointStatsHistory history,
+			@Nullable RestoredCheckpointStats latestRestoredCheckpoint) {
+
+		this.counts = checkNotNull(counts);
+		this.summary= checkNotNull(summary);
+		this.history = checkNotNull(history);
+		this.latestRestoredCheckpoint = latestRestoredCheckpoint;
+	}
+
+	/**
+	 * Returns the snapshotted checkpoint counts.
+	 *
+	 * @return Snapshotted checkpoint counts.
+	 */
+	public CheckpointStatsCounts getCounts() {
+		return counts;
+	}
+
+	/**
+	 * Returns the snapshotted completed checkpoint summary stats.
+	 *
+	 * @return Snapshotted completed checkpoint summary stats.
+	 */
+	public CompletedCheckpointStatsSummary getSummaryStats() {
+		return summary;
+	}
+
+	/**
+	 * Returns the snapshotted checkpoint history.
+	 *
+	 * @return Snapshotted checkpoint history.
+	 */
+	public CheckpointStatsHistory getHistory() {
+		return history;
+	}
+
+	/**
+	 * Returns the latest restored checkpoint.
+	 *
+	 * @return Latest restored checkpoint or <code>null</code>.
+	 */
+	public RestoredCheckpointStats getLatestRestoredCheckpoint() {
+		return latestRestoredCheckpoint;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatus.java
new file mode 100644
index 0000000..670a6e4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatus.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Status of the tracked checkpoint.
+ */
+public enum CheckpointStatsStatus {
+
+	/** Checkpoint that is still in progress. */
+	IN_PROGRESS,
+
+	/** Checkpoint that has successfully completed. */
+	COMPLETED,
+
+	/** Checkpoint that failed. */
+	FAILED;
+
+	/**
+	 * Returns whether the checkpoint is in progress.
+	 *
+	 * @return <code>true</code> if checkpoint is in progress, <code>false</code> otherwise.
+	 */
+	public boolean isInProgress() {
+		return this == IN_PROGRESS;
+	}
+
+	/**
+	 * Returns whether the checkpoint has completed successfully.
+	 *
+	 * @return <code>true</code> if checkpoint has completed, <code>false</code> otherwise.
+	 */
+	public boolean isCompleted() {
+		return this == COMPLETED;
+	}
+
+	/**
+	 * Returns whether the checkpoint has failed.
+	 *
+	 * @return <code>true</code> if checkpoint has failed, <code>false</code> otherwise.
+	 */
+	public boolean isFailed() {
+		return this == FAILED;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
new file mode 100644
index 0000000..92f707f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker for checkpoint statistics.
+ *
+ * <p>This is tightly integrated with the {@link CheckpointCoordinator} in
+ * order to ease the gathering of fine-grained statistics.
+ *
+ * <p>The tracked stats include summary counts, a detailed history of recent
+ * and in progress checkpoints as well as summaries about the size, duration
+ * and more of recent checkpoints.
+ *
+ * <p>Data is gathered via callbacks in the {@link CheckpointCoordinator} and
+ * related classes like {@link PendingCheckpoint} and {@link CompletedCheckpoint},
+ * which receive the raw stats data in the first place.
+ *
+ * <p>The statistics are accessed via {@link #createSnapshot()} and exposed via
+ * both the web frontend and the {@link Metric} system.
+ */
+public class CheckpointStatsTracker implements Serializable {
+
+	private static final long serialVersionUID = 1694085244807339288L;
+
+	/**
+	 * Lock used to update stats and creating snapshots. Updates always happen
+	 * from a single Thread at a time and there can be multiple concurrent read
+	 * accesses to the latest stats snapshot.
+	 *
+	 * Currently, writes are executed by whatever Thread executes the coordinator
+	 * actions (which already happens in locked scope). Reads can come from
+	 * multiple concurrent Netty event loop Threads of the web runtime monitor.
+	 */
+	private final ReentrantLock statsReadWriteLock = new ReentrantLock();
+
+	/** The job vertices taking part in the checkpoints. */
+	private final List<ExecutionJobVertex> jobVertices;
+
+	/** Total number of subtasks to checkpoint. */
+	private final int totalSubtaskCount;
+
+	/** Snapshotting settings created from the CheckpointConfig. */
+	private final JobSnapshottingSettings jobSnapshottingSettings;
+
+	/** Checkpoint counts. */
+	private final CheckpointStatsCounts counts = new CheckpointStatsCounts();
+
+	/** A summary of the completed checkpoint stats. */
+	private final CompletedCheckpointStatsSummary summary = new CompletedCheckpointStatsSummary();
+
+	/** History of checkpoints. */
+	private final CheckpointStatsHistory history;
+
+	/** The latest restored checkpoint. */
+	@Nullable
+	private RestoredCheckpointStats latestRestoredCheckpoint;
+
+	/** Latest created snapshot. */
+	private volatile CheckpointStatsSnapshot latestSnapshot;
+
+	/**
+	 * Flag indicating whether a new snapshot needs to be created. This is true
+	 * if a new checkpoint was triggered or updated (completed successfully or
+	 * failed).
+	 */
+	private volatile boolean dirty;
+
+	/**
+	 * Creates a new checkpoint stats tracker.
+	 *
+	 * @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in progress ones.
+	 * @param jobVertices Job vertices involved in the checkpoints.
+	 * @param jobSnapshottingSettings Snapshotting settings created from the CheckpointConfig.
+	 * @param metricGroup Metric group for exposed metrics
+	 */
+	public CheckpointStatsTracker(
+		int numRememberedCheckpoints,
+		List<ExecutionJobVertex> jobVertices,
+		JobSnapshottingSettings jobSnapshottingSettings,
+		MetricGroup metricGroup) {
+
+		checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints");
+		this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
+		this.jobVertices = checkNotNull(jobVertices, "JobVertices");
+		this.jobSnapshottingSettings = checkNotNull(jobSnapshottingSettings);
+
+		// Compute the total subtask count. We do this here in order to only
+		// do it once.
+		int count = 0;
+		for (ExecutionJobVertex vertex : jobVertices) {
+			count += vertex.getParallelism();
+		}
+		this.totalSubtaskCount = count;
+
+		// Latest snapshot is empty
+		latestSnapshot = new CheckpointStatsSnapshot(
+			counts.createSnapshot(),
+			summary.createSnapshot(),
+			history.createSnapshot(),
+			null);
+
+		// Register the metrics
+		registerMetrics(metricGroup);
+	}
+
+	/**
+	 * Returns the job's snapshotting settings which are derived from the
+	 * CheckpointConfig.
+	 *
+	 * @return The job's snapshotting settings.
+	 */
+	public JobSnapshottingSettings getSnapshottingSettings() {
+		return jobSnapshottingSettings;
+	}
+
+	/**
+	 * Creates a new snapshot of the available stats.
+	 *
+	 * @return The latest statistics snapshot.
+	 */
+	public CheckpointStatsSnapshot createSnapshot() {
+		CheckpointStatsSnapshot snapshot = latestSnapshot;
+
+		// Only create a new snapshot if dirty and no update in progress,
+		// because we don't want to block the coordinator.
+		if (dirty && statsReadWriteLock.tryLock()) {
+			try {
+				// Create a new snapshot
+				snapshot = new CheckpointStatsSnapshot(
+					counts.createSnapshot(),
+					summary.createSnapshot(),
+					history.createSnapshot(),
+					latestRestoredCheckpoint);
+
+				latestSnapshot = snapshot;
+
+				dirty = false;
+			} finally {
+				statsReadWriteLock.unlock();
+			}
+		}
+
+		return snapshot;
+	}
+
+	// ------------------------------------------------------------------------
+	// Callbacks
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new pending checkpoint tracker.
+	 *
+	 * @param checkpointId ID of the checkpoint.
+	 * @param triggerTimestamp Trigger timestamp of the checkpoint.
+	 * @param props The checkpoint properties.
+	 * @return Tracker for statistics gathering.
+	 */
+	PendingCheckpointStats reportPendingCheckpoint(
+			long checkpointId,
+			long triggerTimestamp,
+			CheckpointProperties props) {
+
+		ConcurrentHashMap<JobVertexID, TaskStateStats> taskStateStats = createEmptyTaskStateStatsMap();
+
+		PendingCheckpointStats pending = new PendingCheckpointStats(
+				checkpointId,
+				triggerTimestamp,
+				props,
+				totalSubtaskCount,
+				taskStateStats,
+				new PendingCheckpointStatsCallback());
+
+		statsReadWriteLock.lock();
+		try {
+			counts.incrementInProgressCheckpoints();
+			history.addInProgressCheckpoint(pending);
+
+			dirty = true;
+		} finally {
+			statsReadWriteLock.unlock();
+		}
+
+		return pending;
+	}
+
+	void reportRestoredCheckpoint(RestoredCheckpointStats restored) {
+		checkNotNull(restored, "Restored checkpoint");
+
+		statsReadWriteLock.lock();
+		try {
+			counts.incrementRestoredCheckpoints();
+			latestRestoredCheckpoint = restored;
+
+			dirty = true;
+		} finally {
+			statsReadWriteLock.unlock();
+		}
+	}
+
+	/**
+	 * Callback when a checkpoint completes.
+	 *
+	 * @param completed The completed checkpoint stats.
+	 */
+	private void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
+		statsReadWriteLock.lock();
+		try {
+			counts.incrementCompletedCheckpoints();
+			history.replacePendingCheckpointById(completed);
+
+			summary.updateSummary(completed);
+
+			dirty = true;
+		} finally {
+			statsReadWriteLock.unlock();
+		}
+	}
+
+	/**
+	 * Callback when a checkpoint fails.
+	 *
+	 * @param failed The failed checkpoint stats.
+	 */
+	private void reportFailedCheckpoint(FailedCheckpointStats failed) {
+		statsReadWriteLock.lock();
+		try {
+			counts.incrementFailedCheckpoints();
+			history.replacePendingCheckpointById(failed);
+
+			dirty = true;
+		} finally {
+			statsReadWriteLock.unlock();
+		}
+	}
+
+	/**
+	 * Creates an empty map with a {@link TaskStateStats} instance per task
+	 * that is involved in the checkpoint.
+	 *
+	 * @return An empty map with an {@link TaskStateStats} entry for each task that is involved in the checkpoint.
+	 */
+	private ConcurrentHashMap<JobVertexID, TaskStateStats> createEmptyTaskStateStatsMap() {
+		ConcurrentHashMap<JobVertexID, TaskStateStats> taskStatsMap = new ConcurrentHashMap<>(jobVertices.size());
+		for (ExecutionJobVertex vertex : jobVertices) {
+			TaskStateStats taskStats = new TaskStateStats(vertex.getJobVertexId(), vertex.getParallelism());
+			taskStatsMap.put(vertex.getJobVertexId(), taskStats);
+		}
+		return taskStatsMap;
+	}
+
+	/**
+	 * Callback for finalization of a pending checkpoint.
+	 */
+	class PendingCheckpointStatsCallback {
+
+		/**
+		 * Report a completed checkpoint.
+		 *
+		 * @param completed The completed checkpoint.
+		 */
+		void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
+			CheckpointStatsTracker.this.reportCompletedCheckpoint(completed);
+		}
+
+		/**
+		 * Report a failed checkpoint.
+		 *
+		 * @param failed The failed checkpoint.
+		 */
+		void reportFailedCheckpoint(FailedCheckpointStats failed) {
+			CheckpointStatsTracker.this.reportFailedCheckpoint(failed);
+		}
+
+	}
+
+	// ------------------------------------------------------------------------
+	// Metrics
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	static final String NUMBER_OF_CHECKPOINTS_METRIC = "totalNumberOfCheckpoints";
+
+	@VisibleForTesting
+	static final String NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC = "numberOfInProgressCheckpoints";
+
+	@VisibleForTesting
+	static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC = "numberOfCompletedCheckpoints";
+
+	@VisibleForTesting
+	static final String NUMBER_OF_FAILED_CHECKPOINTS_METRIC = "numberOfFailedCheckpoints";
+
+	@VisibleForTesting
+	static final String LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC = "lastCheckpointRestoreTimestamp";
+
+	@VisibleForTesting
+	static final String LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC = "lastCheckpointSize";
+
+	@VisibleForTesting
+	static final String LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC = "lastCheckpointDuration";
+
+	@VisibleForTesting
+	static final String LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC = "lastCheckpointAlignmentBuffered";
+
+	@VisibleForTesting
+	static final String LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC = "lastCheckpointExternalPath";
+
+	/**
+	 * Register the exposed metrics.
+	 *
+	 * @param metricGroup Metric group to use for the metrics.
+	 */
+	private void registerMetrics(MetricGroup metricGroup) {
+		metricGroup.gauge(NUMBER_OF_CHECKPOINTS_METRIC, new CheckpointsCounter());
+		metricGroup.gauge(NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC, new InProgressCheckpointsCounter());
+		metricGroup.gauge(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC, new CompletedCheckpointsCounter());
+		metricGroup.gauge(NUMBER_OF_FAILED_CHECKPOINTS_METRIC, new FailedCheckpointsCounter());
+		metricGroup.gauge(LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC, new LatestRestoredCheckpointTimestampGauge());
+		metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC, new LatestCompletedCheckpointSizeGauge());
+		metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC, new LatestCompletedCheckpointDurationGauge());
+		metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC, new LatestCompletedCheckpointAlignmentBufferedGauge());
+		metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC, new LatestCompletedCheckpointExternalPathGauge());
+	}
+
+	private class CheckpointsCounter implements Gauge<Long> {
+		@Override
+		public Long getValue() {
+			return counts.getTotalNumberOfCheckpoints();
+		}
+	}
+
+	private class InProgressCheckpointsCounter implements Gauge<Integer> {
+		@Override
+		public Integer getValue() {
+			return counts.getNumberOfInProgressCheckpoints();
+		}
+	}
+
+	private class CompletedCheckpointsCounter implements Gauge<Long> {
+		@Override
+		public Long getValue() {
+			return counts.getNumberOfCompletedCheckpoints();
+		}
+	}
+
+	private class FailedCheckpointsCounter implements Gauge<Long> {
+		@Override
+		public Long getValue() {
+			return counts.getNumberOfFailedCheckpoints();
+		}
+	}
+
+	private class LatestRestoredCheckpointTimestampGauge implements Gauge<Long> {
+		@Override
+		public Long getValue() {
+			RestoredCheckpointStats restored = latestRestoredCheckpoint;
+			if (restored != null) {
+				return restored.getRestoreTimestamp();
+			} else {
+				return -1L;
+			}
+		}
+	}
+
+	private class LatestCompletedCheckpointSizeGauge implements Gauge<Long> {
+		@Override
+		public Long getValue() {
+			CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+			if (completed != null) {
+				return completed.getStateSize();
+			} else {
+				return -1L;
+			}
+		}
+	}
+
+	private class LatestCompletedCheckpointDurationGauge implements Gauge<Long> {
+		@Override
+		public Long getValue() {
+			CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+			if (completed != null) {
+				return completed.getEndToEndDuration();
+			} else {
+				return -1L;
+			}
+		}
+	}
+
+
+	private class LatestCompletedCheckpointAlignmentBufferedGauge implements Gauge<Long> {
+		@Override
+		public Long getValue() {
+			CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+			if (completed != null) {
+				return completed.getAlignmentBuffered();
+			} else {
+				return -1L;
+			}
+		}
+	}
+
+	private class LatestCompletedCheckpointExternalPathGauge implements Gauge<String> {
+		@Override
+		public String getValue() {
+			CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+			if (completed != null) {
+				return completed.getExternalPath();
+			} else {
+				return "n/a";
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 0e70b1a..52f2a6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.StateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
@@ -62,6 +63,10 @@ public class CompletedCheckpoint implements Serializable {
 	/** External path if persisted checkpoint; <code>null</code> otherwise. */
 	private final String externalPath;
 
+	/** Optional stats tracker callback for discard. */
+	@Nullable
+	private transient CompletedCheckpointStats.DiscardCallback discardCallback;
+
 	// ------------------------------------------------------------------------
 
 	public CompletedCheckpoint(
@@ -160,6 +165,10 @@ public class CompletedCheckpoint implements Serializable {
 			StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
 		} finally {
 			taskStates.clear();
+
+			if (discardCallback != null) {
+				discardCallback.notifyDiscardedCheckpoint();
+			}
 		}
 	}
 
@@ -185,6 +194,15 @@ public class CompletedCheckpoint implements Serializable {
 		return externalPath;
 	}
 
+	/**
+	 * Sets the callback for tracking when this checkpoint is discarded.
+	 *
+	 * @param discardCallback Callback to call when the checkpoint is discarded.
+	 */
+	void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback discardCallback) {
+		this.discardCallback = discardCallback;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
new file mode 100644
index 0000000..4d2d995
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Statistics for a successfully completed checkpoint.
+ *
+ * <p>The reported statistics are immutable except for the discarded flag, which
+ * is updated via the {@link DiscardCallback} and the {@link CompletedCheckpoint}
+ * after an instance of this class has been created.
+ */
+public class CompletedCheckpointStats extends AbstractCheckpointStats {
+
+	/** Callback for the {@link CompletedCheckpoint} instance to notify about discard. */
+	private final DiscardCallback discardCallback;
+
+	/** Total checkpoint state size over all subtasks. */
+	private final long stateSize;
+
+	/** Buffered bytes during alignment over all subtasks. */
+	private final long alignmentBuffered;
+
+	/** The latest acknowledged subtask stats. */
+	private final SubtaskStateStats latestAcknowledgedSubtask;
+
+	/** Optional external path if persisted externally. */
+	@Nullable
+	private final String externalPath;
+
+	/** Flag indicating whether the checkpoint was discarded. */
+	private volatile boolean discarded;
+
+	/**
+	 * Creates a tracker for a {@link CompletedCheckpoint}.
+	 *
+	 * @param checkpointId ID of the checkpoint.
+	 * @param triggerTimestamp Timestamp when the checkpoint was triggered.
+	 * @param props Checkpoint properties of the checkpoint.
+	 * @param totalSubtaskCount Total number of subtasks for the checkpoint.
+	 * @param taskStats Task stats for each involved operator.
+	 * @param numAcknowledgedSubtasks Number of acknowledged subtasks.
+	 * @param stateSize Total checkpoint state size over all subtasks.
+	 * @param alignmentBuffered Buffered bytes during alignment over all subtasks.
+	 * @param latestAcknowledgedSubtask The latest acknowledged subtask stats.
+	 * @param externalPath Optional external path if persisted externally.
+	 */
+	CompletedCheckpointStats(
+		long checkpointId,
+		long triggerTimestamp,
+		CheckpointProperties props,
+		int totalSubtaskCount,
+		Map<JobVertexID, TaskStateStats> taskStats,
+		int numAcknowledgedSubtasks,
+		long stateSize,
+		long alignmentBuffered,
+		SubtaskStateStats latestAcknowledgedSubtask,
+		@Nullable String externalPath) {
+
+		super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
+		checkArgument(numAcknowledgedSubtasks == totalSubtaskCount, "Did not acknowledge all subtasks.");
+		checkArgument(stateSize >= 0, "Negative state size");
+		this.stateSize = stateSize;
+		this.alignmentBuffered = alignmentBuffered;
+		this.latestAcknowledgedSubtask = checkNotNull(latestAcknowledgedSubtask);
+		this.externalPath = externalPath;
+		this.discardCallback = new DiscardCallback();
+	}
+
+	@Override
+	public CheckpointStatsStatus getStatus() {
+		return CheckpointStatsStatus.COMPLETED;
+	}
+
+	@Override
+	public int getNumberOfAcknowledgedSubtasks() {
+		return numberOfSubtasks;
+	}
+
+	@Override
+	public long getStateSize() {
+		return stateSize;
+	}
+
+	@Override
+	public long getAlignmentBuffered() {
+		return alignmentBuffered;
+	}
+
+	@Override
+	@Nullable
+	public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
+		return latestAcknowledgedSubtask;
+	}
+
+	// ------------------------------------------------------------------------
+	// Completed checkpoint specific methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the external path if this checkpoint was persisted externally.
+	 *
+	 * @return External path of this checkpoint or <code>null</code>.
+	 */
+	@Nullable
+	public String getExternalPath() {
+		return externalPath;
+	}
+
+	/**
+	 * Returns whether the checkpoint has been discarded.
+	 *
+	 * @return <code>true</code> if the checkpoint has been discarded, <code>false</code> otherwise.
+	 */
+	public boolean isDiscarded() {
+		return discarded;
+	}
+
+	/**
+	 * Returns the callback for the {@link CompletedCheckpoint}.
+	 *
+	 * @return Callback for the {@link CompletedCheckpoint}.
+	 */
+	DiscardCallback getDiscardCallback() {
+		return discardCallback;
+	}
+
+	/**
+	 * Callback for the {@link CompletedCheckpoint} instance to notify about
+	 * disposal of the checkpoint (most commonly when the checkpoint has been
+	 * subsumed by a newer one).
+	 */
+	class DiscardCallback {
+
+		/**
+		 * Updates the discarded flag of the checkpoint stats.
+		 *
+		 * <p>After this notification, {@link #isDiscarded()} will return
+		 * <code>true</code>.
+		 */
+		void notifyDiscardedCheckpoint() {
+			discarded = true;
+		}
+
+	}
+
+	@Override
+	public String toString() {
+		return "CompletedCheckpoint(id=" + getCheckpointId() + ")";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummary.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummary.java
new file mode 100644
index 0000000..7e3f4b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummary.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Summary over <strong>all</strong> completed checkpoints.
+ */
+public class CompletedCheckpointStatsSummary implements Serializable {
+
+	private static final long serialVersionUID = 5784360461635814038L;
+
+	/** State size statistics for all completed checkpoints. */
+	private final MinMaxAvgStats stateSize;
+
+	/** Duration statistics for all completed checkpoints. */
+	private final MinMaxAvgStats duration;
+
+	/** Byte buffered during alignment for all completed checkpoints. */
+	private final MinMaxAvgStats alignmentBuffered;
+
+	CompletedCheckpointStatsSummary() {
+		this(new MinMaxAvgStats(), new MinMaxAvgStats(), new MinMaxAvgStats());
+	}
+
+	private CompletedCheckpointStatsSummary(
+			MinMaxAvgStats stateSize,
+			MinMaxAvgStats duration,
+			MinMaxAvgStats alignmentBuffered) {
+
+		this.stateSize = checkNotNull(stateSize);
+		this.duration = checkNotNull(duration);
+		this.alignmentBuffered = checkNotNull(alignmentBuffered);
+	}
+
+	/**
+	 * Updates the summary with the given completed checkpoint.
+	 *
+	 * @param completed Completed checkpoint to update the summary with.
+	 */
+	void updateSummary(CompletedCheckpointStats completed) {
+		stateSize.add(completed.getStateSize());
+		duration.add(completed.getEndToEndDuration());
+		alignmentBuffered.add(completed.getAlignmentBuffered());
+	}
+
+	/**
+	 * Creates a snapshot of the current state.
+	 *
+	 * @return A snapshot of the current state.
+	 */
+	CompletedCheckpointStatsSummary createSnapshot() {
+		return new CompletedCheckpointStatsSummary(
+				stateSize.createSnapshot(),
+				duration.createSnapshot(),
+				alignmentBuffered.createSnapshot());
+	}
+
+	/**
+	 * Returns the summary stats for the state size of completed checkpoints.
+	 *
+	 * @return Summary stats for the state size.
+	 */
+	public MinMaxAvgStats getStateSizeStats() {
+		return stateSize;
+	}
+
+	/**
+	 * Returns the summary stats for the duration of completed checkpoints.
+	 *
+	 * @return Summary stats for the duration.
+	 */
+	public MinMaxAvgStats getEndToEndDurationStats() {
+		return duration;
+	}
+
+	/**
+	 * Returns the summary stats for the bytes buffered during alignment.
+	 *
+	 * <p>If no alignments are reported or happen (at least once mode), the
+	 * returned stats are in their initial state.
+	 *
+	 * @return Summary stats for the bytes buffered during alignment.
+	 */
+	public MinMaxAvgStats getAlignmentBufferedStats() {
+		return alignmentBuffered;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
new file mode 100644
index 0000000..83d7c3d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Statistics for a failed checkpoint.
+ *
+ * <p>The reported statistics are immutable.
+ */
+public class FailedCheckpointStats extends AbstractCheckpointStats {
+
+	/** Number of acknowledged tasks. */
+	private final int numAcknowledgedSubtasks;
+
+	/** Total checkpoint state size over all subtasks. */
+	private final long stateSize;
+
+	/** Buffered bytes during alignment over all subtasks. */
+	private final long alignmentBuffered;
+
+	/** Timestamp when the checkpoint was failed at the coordinator. */
+	private final long failureTimestamp;
+
+	/**
+	 * The latest acknowledged subtask stats if any subtask was acknowledged
+	 * before failing the checkpoint.
+	 */
+	@Nullable
+	private final SubtaskStateStats latestAcknowledgedSubtask;
+
+	/** Optional failure message. */
+	@Nullable
+	private final String failureMsg;
+
+	/**
+	 * Creates a tracker for a failed checkpoint.
+	 *
+	 * @param checkpointId ID of the checkpoint.
+	 * @param triggerTimestamp Timestamp when the checkpoint was triggered.
+	 * @param props Checkpoint properties of the checkpoint.
+	 * @param totalSubtaskCount Total number of subtasks for the checkpoint.
+	 * @param taskStats Task stats for each involved operator.
+	 * @param numAcknowledgedSubtasks Number of acknowledged subtasks.
+	 * @param stateSize Total checkpoint state size over all subtasks.
+	 * @param alignmentBuffered Buffered bytes during alignment over all subtasks.
+	 * @param failureTimestamp Timestamp when this checkpoint failed.
+	 * @param latestAcknowledgedSubtask The latest acknowledged subtask stats or <code>null</code>.
+	 * @param cause Cause of the checkpoint failure or <code>null</code>.
+	 */
+	FailedCheckpointStats(
+		long checkpointId,
+		long triggerTimestamp,
+		CheckpointProperties props,
+		int totalSubtaskCount,
+		Map<JobVertexID, TaskStateStats> taskStats,
+		int numAcknowledgedSubtasks,
+		long stateSize,
+		long alignmentBuffered,
+		long failureTimestamp,
+		@Nullable SubtaskStateStats latestAcknowledgedSubtask,
+		@Nullable Throwable cause) {
+
+		super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
+		checkArgument(numAcknowledgedSubtasks >= 0, "Negative number of ACKs");
+		this.numAcknowledgedSubtasks = numAcknowledgedSubtasks;
+		checkArgument(stateSize >= 0, "Negative state size");
+		this.stateSize = stateSize;
+		this.alignmentBuffered = alignmentBuffered;
+		this.failureTimestamp = failureTimestamp;
+		this.latestAcknowledgedSubtask = latestAcknowledgedSubtask;
+		this.failureMsg = cause != null ? cause.getMessage() : null;
+	}
+
+	@Override
+	public CheckpointStatsStatus getStatus() {
+		return CheckpointStatsStatus.FAILED;
+	}
+
+	@Override
+	public int getNumberOfAcknowledgedSubtasks() {
+		return numAcknowledgedSubtasks;
+	}
+
+	@Override
+	public long getStateSize() {
+		return stateSize;
+	}
+
+	@Override
+	public long getAlignmentBuffered() {
+		return alignmentBuffered;
+	}
+
+	@Override
+	@Nullable
+	public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
+		return latestAcknowledgedSubtask;
+	}
+
+	/**
+	 * Returns the end to end duration until the checkpoint failure.
+	 */
+	@Override
+	public long getEndToEndDuration() {
+		return Math.max(0, failureTimestamp - triggerTimestamp);
+	}
+
+	// ------------------------------------------------------------------------
+	// Failed checkpoint specific methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the timestamp when this checkpoint failed.
+	 *
+	 * @return Timestamp when the checkpoint failed.
+	 */
+	public long getFailureTimestamp() {
+		return failureTimestamp;
+	}
+
+	/**
+	 * Returns the failure message or <code>null</code> if no cause was provided.
+	 *
+	 * @return Failure message of the checkpoint failure or <code>null</code>.
+	 */
+	@Nullable
+	public String getFailureMessage() {
+		return failureMsg;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStats.java
new file mode 100644
index 0000000..9d4c116
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStats.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+/**
+ * Helper for keeping track of min/max/average summaries.
+ */
+public class MinMaxAvgStats implements Serializable {
+
+	private static final long serialVersionUID = 1769601903483446707L;
+
+	/** Current min value. */
+	private long min;
+
+	/** Current max value. */
+	private long max;
+
+	/** Sum of all added values. */
+	private long sum;
+
+	/** Count of added values. */
+	private long count;
+
+	MinMaxAvgStats() {
+	}
+
+	private MinMaxAvgStats(long min, long max, long sum, long count) {
+		this.min = min;
+		this.max = max;
+		this.sum = sum;
+		this.count = count;
+	}
+
+	/**
+	 * Adds the value to the stats if it is >= 0.
+	 *
+	 * @param value Value to add for min/max/avg stats..
+	 */
+	void add(long value) {
+		if (value >= 0) {
+			if (count > 0) {
+				min =  Math.min(min, value);
+				max = Math.max(max, value);
+			} else {
+				min = value;
+				max = value;
+			}
+
+			count++;
+			sum += value;
+		}
+	}
+
+	/**
+	 * Returns a snapshot of the current state.
+	 *
+	 * @return A snapshot of the current state.
+	 */
+	MinMaxAvgStats createSnapshot() {
+		return new MinMaxAvgStats(min, max, sum, count);
+	}
+
+	/**
+	 * Returns the minimum seen value.
+	 *
+	 * @return The current minimum value.
+	 */
+	public long getMinimum() {
+		return min;
+	}
+
+	/**
+	 * Returns the maximum seen value.
+	 *
+	 * @return The current maximum value.
+	 */
+	public long getMaximum() {
+		return max;
+	}
+
+	/**
+	 * Returns the sum of all seen values.
+	 *
+	 * @return Sum of all values.
+	 */
+	public long getSum() {
+		return sum;
+	}
+
+	/**
+	 * Returns the count of all seen values.
+	 *
+	 * @return Count of all values.
+	 */
+	public long getCount() {
+		return count;
+	}
+
+	/**
+	 * Calculates the average over all seen values.
+	 *
+	 * @return Average over all seen values.
+	 */
+	public long getAverage() {
+		if (count == 0) {
+			return 0;
+		} else {
+			return sum / count;
+		}
+	}
+
+}