You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/10 08:49:42 UTC

[03/11] flink git commit: [FLINK-4410] [runtime-web] Add detailed checkpoint stats handlers

[FLINK-4410] [runtime-web] Add detailed checkpoint stats handlers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dec0d6bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dec0d6bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dec0d6bb

Branch: refs/heads/master
Commit: dec0d6bb702cfcbf30e60f6011565fd9455bb121
Parents: 579bc96
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Dec 23 20:44:12 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Jan 10 09:48:52 2017 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  21 +-
 .../AbstractJobVertexRequestHandler.java        |  34 +-
 .../checkpoints/CheckpointConfigHandler.java    |  77 +++++
 .../checkpoints/CheckpointStatsCache.java       |  80 +++++
 .../CheckpointStatsDetailsHandler.java          | 153 +++++++++
 .../CheckpointStatsDetailsSubtasksHandler.java  | 189 ++++++++++
 .../checkpoints/CheckpointStatsHandler.java     | 235 +++++++++++++
 .../CheckpointConfigHandlerTest.java            | 146 ++++++++
 .../checkpoints/CheckpointStatsCacheTest.java   |  67 ++++
 .../CheckpointStatsDetailsHandlerTest.java      | 286 ++++++++++++++++
 .../checkpoints/CheckpointStatsHandlerTest.java | 303 ++++++++++++++++
 ...heckpointStatsSubtaskDetailsHandlerTest.java | 342 +++++++++++++++++++
 12 files changed, 1916 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index c5b7d35..3080b57 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -53,7 +53,6 @@ import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobCancellationWithSavepointHandlers;
-import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
@@ -62,7 +61,6 @@ import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexTaskManagersHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
@@ -73,6 +71,11 @@ import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandl
 import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsCache;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
@@ -273,7 +276,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher)))
 			.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(
 							currentGraphs,
 							backPressureStatsTracker,
@@ -288,7 +290,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
 			.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher)))
 
 			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
@@ -328,6 +329,18 @@ public class WebRuntimeMonitor implements WebMonitor {
 			// DELETE is the preferred way of stopping a job (Rest-conform)
 			.DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler()));
 
+		int maxCachedEntries = config.getInteger(
+				ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+		CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
+
+		// Register the checkpoint stats handlers
+		router
+			.GET("/jobs/:jobid/checkpoints", handler(new CheckpointStatsHandler(currentGraphs)))
+			.GET("/jobs/:jobid/checkpoints/config", handler(new CheckpointConfigHandler(currentGraphs)))
+			.GET("/jobs/:jobid/checkpoints/details/:checkpointid", handler(new CheckpointStatsDetailsHandler(currentGraphs, cache)))
+			.GET("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", handler(new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache)));
+
 		if (webSubmitAllow) {
 			router
 				// fetch the list of uploaded jars.

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
index a36f94a..38243e5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -37,26 +37,34 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
 
 	@Override
 	public final String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		final String vidString = params.get("vertexid");
-		if (vidString == null) {
-			throw new IllegalArgumentException("vertexId parameter missing");
-		}
-
-		final JobVertexID vid;
-		try {
-			vid = JobVertexID.fromHexString(vidString);
-		}
-		catch (Exception e) {
-			throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
-		}
+		final JobVertexID vid = parseJobVertexId(params);
 
 		final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
 		if (jobVertex == null) {
-			throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
+			throw new IllegalArgumentException("No vertex with ID '" + vid + "' exists.");
 		}
 
 		return handleRequest(jobVertex, params);
 	}
+
+	/**
+	 * Returns the job vertex ID parsed from the provided parameters.
+	 *
+	 * @param params Path parameters
+	 * @return Parsed job vertex ID or <code>null</code> if not available.
+	 */
+	public static JobVertexID parseJobVertexId(Map<String, String> params) {
+		String jobVertexIdParam = params.get("vertexid");
+		if (jobVertexIdParam == null) {
+			return null;
+		}
+
+		try {
+			return JobVertexID.fromHexString(jobVertexIdParam);
+		} catch (RuntimeException ignored) {
+			return null;
+		}
+	}
 	
 	public abstract String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
new file mode 100644
index 0000000..1ad5e65
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Handler that returns a job's snapshotting settings.
+ */
+public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler {
+
+	public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
+		CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
+		JobSnapshottingSettings settings = tracker.getSnapshottingSettings();
+
+		gen.writeStartObject();
+		{
+			gen.writeStringField("mode", settings.isExactlyOnce() ? "exactly_once" : "at_least_once");
+			gen.writeNumberField("interval", settings.getCheckpointInterval());
+			gen.writeNumberField("timeout", settings.getCheckpointTimeout());
+			gen.writeNumberField("min_pause", settings.getMinPauseBetweenCheckpoints());
+			gen.writeNumberField("max_concurrent", settings.getMaxConcurrentCheckpoints());
+
+			ExternalizedCheckpointSettings externalization = settings.getExternalizedCheckpointSettings();
+			gen.writeObjectFieldStart("externalization");
+			{
+				if (externalization.externalizeCheckpoints()) {
+					gen.writeBooleanField("enabled", true);
+					gen.writeBooleanField("delete_on_cancellation", externalization.deleteOnCancellation());
+				} else {
+					gen.writeBooleanField("enabled", false);
+				}
+			}
+			gen.writeEndObject();
+
+		}
+		gen.writeEndObject();
+
+		gen.close();
+
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
new file mode 100644
index 0000000..35d529a
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
+import javax.annotation.Nullable;
+
+/**
+ * A size-based cache of accessed checkpoints for completed and failed
+ * checkpoints.
+ *
+ * <p>Having this cache in place for accessed stats improves the user
+ * experience quite a bit as accessed checkpoint stats stay available
+ * and don't expire. For example if you manage to click on the last
+ * checkpoint in the history, it is not available via the stats as soon
+ * as another checkpoint is triggered. With the cache in place, the
+ * checkpoint will still be available for investigation.
+ */
+public class CheckpointStatsCache {
+
+	@Nullable
+	private final Cache<Long, AbstractCheckpointStats> cache;
+
+	public CheckpointStatsCache(int maxNumEntries) {
+		if (maxNumEntries > 0) {
+			this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder()
+				.maximumSize(maxNumEntries)
+				.build();
+		} else {
+			this.cache = null;
+		}
+	}
+
+	/**
+	 * Try to add the checkpoint to the cache.
+	 *
+	 * @param checkpoint Checkpoint to be added.
+	 */
+	void tryAdd(AbstractCheckpointStats checkpoint) {
+		// Don't add in progress checkpoints as they will be replaced by their
+		// completed/failed version eventually.
+		if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
+			cache.put(checkpoint.getCheckpointId(), checkpoint);
+		}
+	}
+
+	/**
+	 * Try to look up a checkpoint by it's ID in the cache.
+	 *
+	 * @param checkpointId ID of the checkpoint to look up.
+	 * @return The checkpoint or <code>null</code> if checkpoint not found.
+	 */
+	AbstractCheckpointStats tryGet(long checkpointId) {
+		if (cache != null) {
+			return cache.getIfPresent(checkpointId);
+		} else {
+			return null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
new file mode 100644
index 0000000..6bb8300
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex.
+ */
+public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler {
+
+	private final CheckpointStatsCache cache;
+
+	public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
+		super(executionGraphHolder);
+		this.cache = cache;
+	}
+
+	@Override
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+		long checkpointId = parseCheckpointId(params);
+		if (checkpointId == -1) {
+			return "{}";
+		}
+
+		CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
+		CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+
+		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+		if (checkpoint != null) {
+			cache.tryAdd(checkpoint);
+		} else {
+			checkpoint = cache.tryGet(checkpointId);
+
+			if (checkpoint == null) {
+				return "{}";
+			}
+		}
+
+		return writeResponse(checkpoint);
+	}
+
+	private String writeResponse(AbstractCheckpointStats checkpoint) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+		gen.writeStartObject();
+
+		gen.writeNumberField("id", checkpoint.getCheckpointId());
+		gen.writeStringField("status", checkpoint.getStatus().toString());
+		gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+		gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+		gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+		gen.writeNumberField("state_size", checkpoint.getStateSize());
+		gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+		gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+		gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
+		gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
+
+		if (checkpoint.getStatus().isCompleted()) {
+			// --- Completed ---
+			CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
+
+			String externalPath = completed.getExternalPath();
+			if (externalPath != null) {
+				gen.writeStringField("external_path", externalPath);
+			}
+
+			gen.writeBooleanField("discarded", completed.isDiscarded());
+		}
+		else if (checkpoint.getStatus().isFailed()) {
+			// --- Failed ---
+			FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
+
+			gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+
+			String failureMsg = failed.getFailureMessage();
+			if (failureMsg != null) {
+				gen.writeStringField("failure_message", failureMsg);
+			}
+		}
+
+		gen.writeObjectFieldStart("tasks");
+		for (TaskStateStats taskStats : checkpoint.getAllTaskStateStats()) {
+			gen.writeObjectFieldStart(taskStats.getJobVertexId().toString());
+
+			gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
+			gen.writeNumberField("state_size", taskStats.getStateSize());
+			gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+			gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
+			gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
+			gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
+
+			gen.writeEndObject();
+		}
+		gen.writeEndObject();
+
+		gen.writeEndObject();
+		gen.close();
+
+		return writer.toString();
+	}
+
+	/**
+	 * Returns the checkpoint ID parsed from the provided parameters.
+	 *
+	 * @param params Path parameters
+	 * @return Parsed checkpoint ID or <code>-1</code> if not available.
+	 */
+	static long parseCheckpointId(Map<String, String> params) {
+		String param = params.get("checkpointid");
+		if (param == null) {
+			return -1;
+		}
+
+		try {
+			return Long.parseLong(param);
+		} catch (NumberFormatException ignored) {
+			return -1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
new file mode 100644
index 0000000..3e3088b
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex with
+ * the summary stats and all subtasks.
+ */
+public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler {
+
+	private final CheckpointStatsCache cache;
+
+	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
+		super(executionGraphHolder);
+		this.cache = checkNotNull(cache);
+	}
+
+	@Override
+	public String handleJsonRequest(
+		Map<String, String> pathParams,
+		Map<String, String> queryParams,
+		ActorGateway jobManager) throws Exception {
+		return super.handleJsonRequest(pathParams, queryParams, jobManager);
+	}
+
+	@Override
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+		long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params);
+		if (checkpointId == -1) {
+			return "{}";
+		}
+
+		JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params);
+		if (vertexId == null) {
+			return "{}";
+		}
+
+		CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
+		CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+
+		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+		if (checkpoint != null) {
+			cache.tryAdd(checkpoint);
+		} else {
+			checkpoint = cache.tryGet(checkpointId);
+
+			if (checkpoint == null) {
+				return "{}";
+			}
+		}
+
+		return writeResponse(checkpoint, vertexId);
+	}
+
+	private String writeResponse(AbstractCheckpointStats checkpoint, JobVertexID vertexId) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+		gen.writeStartObject();
+
+		TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
+		if (taskStats == null) {
+			return "{}";
+		}
+
+		// Overview
+		gen.writeNumberField("id", checkpoint.getCheckpointId());
+		gen.writeStringField("status", checkpoint.getStatus().toString());
+		gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
+		gen.writeNumberField("state_size", taskStats.getStateSize());
+		gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+		gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
+		gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
+		gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
+
+		if (taskStats.getNumberOfAcknowledgedSubtasks() > 0) {
+			gen.writeObjectFieldStart("summary");
+			gen.writeObjectFieldStart("state_size");
+			writeMinMaxAvg(gen, taskStats.getSummaryStats().getStateSizeStats());
+			gen.writeEndObject();
+
+			gen.writeObjectFieldStart("end_to_end_duration");
+			MinMaxAvgStats ackTimestampStats = taskStats.getSummaryStats().getAckTimestampStats();
+			gen.writeNumberField("min", Math.max(0, ackTimestampStats.getMinimum() - checkpoint.getTriggerTimestamp()));
+			gen.writeNumberField("max", Math.max(0, ackTimestampStats.getMaximum() - checkpoint.getTriggerTimestamp()));
+			gen.writeNumberField("avg", Math.max(0, ackTimestampStats.getAverage() - checkpoint.getTriggerTimestamp()));
+			gen.writeEndObject();
+
+			gen.writeObjectFieldStart("checkpoint_duration");
+			gen.writeObjectFieldStart("sync");
+			writeMinMaxAvg(gen, taskStats.getSummaryStats().getSyncCheckpointDurationStats());
+			gen.writeEndObject();
+			gen.writeObjectFieldStart("async");
+			writeMinMaxAvg(gen, taskStats.getSummaryStats().getAsyncCheckpointDurationStats());
+			gen.writeEndObject();
+			gen.writeEndObject();
+
+			gen.writeObjectFieldStart("alignment");
+			gen.writeObjectFieldStart("buffered");
+			writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentBufferedStats());
+			gen.writeEndObject();
+			gen.writeObjectFieldStart("duration");
+			writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentDurationStats());
+			gen.writeEndObject();
+			gen.writeEndObject();
+			gen.writeEndObject();
+		}
+
+		SubtaskStateStats[] subtasks = taskStats.getSubtaskStats();
+
+		gen.writeArrayFieldStart("subtasks");
+		for (int i = 0; i < subtasks.length; i++) {
+			SubtaskStateStats subtask = subtasks[i];
+
+			gen.writeStartObject();
+			gen.writeNumberField("index", i);
+
+			if (subtask != null) {
+				gen.writeStringField("status", "completed");
+				gen.writeNumberField("ack_timestamp", subtask.getAckTimestamp());
+				gen.writeNumberField("end_to_end_duration", subtask.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+				gen.writeNumberField("state_size", subtask.getStateSize());
+
+				gen.writeObjectFieldStart("checkpoint");
+				gen.writeNumberField("sync", subtask.getSyncCheckpointDuration());
+				gen.writeNumberField("async", subtask.getAsyncCheckpointDuration());
+				gen.writeEndObject();
+
+				gen.writeObjectFieldStart("alignment");
+				gen.writeNumberField("buffered", subtask.getAlignmentBuffered());
+				gen.writeNumberField("duration", subtask.getAlignmentDuration());
+				gen.writeEndObject();
+			} else {
+				gen.writeStringField("status", "pending");
+			}
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+
+		gen.writeEndObject();
+		gen.close();
+
+		return writer.toString();
+	}
+
+	private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
+		gen.writeNumberField("min", minMaxAvg.getMinimum());
+		gen.writeNumberField("max", minMaxAvg.getMaximum());
+		gen.writeNumberField("avg", minMaxAvg.getAverage());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
new file mode 100644
index 0000000..71f3637
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Handler that returns checkpoint statistics for a job.
+ */
+public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler {
+
+	public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
+		CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
+		CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+
+		gen.writeStartObject();
+
+		// Counts
+		writeCounts(gen, snapshot.getCounts());
+
+		// Summary
+		writeSummary(gen, snapshot.getSummaryStats());
+
+		CheckpointStatsHistory history = snapshot.getHistory();
+
+		// Latest
+		writeLatestCheckpoints(
+			gen,
+			history.getLatestCompletedCheckpoint(),
+			history.getLatestSavepoint(),
+			history.getLatestFailedCheckpoint(),
+			snapshot.getLatestRestoredCheckpoint());
+
+		// History
+		writeHistory(gen, snapshot.getHistory());
+
+		gen.writeEndObject();
+		gen.close();
+
+		return writer.toString();
+	}
+
+	private void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
+		gen.writeObjectFieldStart("counts");
+		gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints());
+		gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints());
+		gen.writeNumberField("in_progress", counts.getNumberOfInProgressCheckpoints());
+		gen.writeNumberField("completed", counts.getNumberOfCompletedCheckpoints());
+		gen.writeNumberField("failed", counts.getNumberOfFailedCheckpoints());
+		gen.writeEndObject();
+	}
+
+	private void writeSummary(
+		JsonGenerator gen,
+		CompletedCheckpointStatsSummary summary) throws IOException {
+		gen.writeObjectFieldStart("summary");
+		gen.writeObjectFieldStart("state_size");
+		writeMinMaxAvg(gen, summary.getStateSizeStats());
+		gen.writeEndObject();
+
+		gen.writeObjectFieldStart("end_to_end_duration");
+		writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
+		gen.writeEndObject();
+
+		gen.writeObjectFieldStart("alignment_buffered");
+		writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
+		gen.writeEndObject();
+		gen.writeEndObject();
+	}
+
+	private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
+		gen.writeNumberField("min", minMaxAvg.getMinimum());
+		gen.writeNumberField("max", minMaxAvg.getMaximum());
+		gen.writeNumberField("avg", minMaxAvg.getAverage());
+	}
+
+	private void writeLatestCheckpoints(
+		JsonGenerator gen,
+		@Nullable CompletedCheckpointStats completed,
+		@Nullable CompletedCheckpointStats savepoint,
+		@Nullable FailedCheckpointStats failed,
+		@Nullable RestoredCheckpointStats restored) throws IOException {
+
+		gen.writeObjectFieldStart("latest");
+		// Completed checkpoint
+		if (completed != null) {
+			gen.writeObjectFieldStart("completed");
+			writeCheckpoint(gen, completed);
+
+			String externalPath = completed.getExternalPath();
+			if (externalPath != null) {
+				gen.writeStringField("external_path", completed.getExternalPath());
+			}
+
+			gen.writeEndObject();
+		}
+
+		// Completed savepoint
+		if (savepoint != null) {
+			gen.writeObjectFieldStart("savepoint");
+			writeCheckpoint(gen, savepoint);
+
+			String externalPath = savepoint.getExternalPath();
+			if (externalPath != null) {
+				gen.writeStringField("external_path", savepoint.getExternalPath());
+			}
+			gen.writeEndObject();
+		}
+
+		// Failed checkpoint
+		if (failed != null) {
+			gen.writeObjectFieldStart("failed");
+			writeCheckpoint(gen, failed);
+
+			gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+			String failureMsg = failed.getFailureMessage();
+			if (failureMsg != null) {
+				gen.writeStringField("failure_message", failureMsg);
+			}
+			gen.writeEndObject();
+		}
+
+		// Restored checkpoint
+		if (restored != null) {
+			gen.writeObjectFieldStart("restored");
+			gen.writeNumberField("id", restored.getCheckpointId());
+			gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp());
+			gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(restored.getProperties()));
+
+			String externalPath = restored.getExternalPath();
+			if (externalPath != null) {
+				gen.writeStringField("external_path", externalPath);
+			}
+			gen.writeEndObject();
+		}
+		gen.writeEndObject();
+	}
+
+	private void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
+		gen.writeNumberField("id", checkpoint.getCheckpointId());
+		gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+		gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+		gen.writeNumberField("state_size", checkpoint.getStateSize());
+		gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+		gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+
+	}
+
+	private void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
+		gen.writeArrayFieldStart("history");
+		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+			gen.writeStartObject();
+			gen.writeNumberField("id", checkpoint.getCheckpointId());
+			gen.writeStringField("status", checkpoint.getStatus().toString());
+			gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+			gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+			gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+			gen.writeNumberField("state_size", checkpoint.getStateSize());
+			gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+			gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+			gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
+			gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
+
+			if (checkpoint.getStatus().isCompleted()) {
+				// --- Completed ---
+				CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
+
+				String externalPath = completed.getExternalPath();
+				if (externalPath != null) {
+					gen.writeStringField("external_path", externalPath);
+				}
+
+				gen.writeBooleanField("discarded", completed.isDiscarded());
+			}
+			else if (checkpoint.getStatus().isFailed()) {
+				// --- Failed ---
+				FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
+
+				gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+
+				String failureMsg = failed.getFailureMessage();
+				if (failureMsg != null) {
+					gen.writeStringField("failure_message", failureMsg);
+				}
+			}
+
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
new file mode 100644
index 0000000..410e044
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CheckpointConfigHandlerTest {
+
+	/**
+	 * Tests a simple config.
+	 */
+	@Test
+	public void testSimpleConfig() throws Exception {
+		long interval = 18231823L;
+		long timeout = 996979L;
+		long minPause = 119191919L;
+		int maxConcurrent = 12929329;
+		ExternalizedCheckpointSettings externalized = ExternalizedCheckpointSettings.none();
+
+		JobSnapshottingSettings settings = new JobSnapshottingSettings(
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			interval,
+			timeout,
+			minPause,
+			maxConcurrent,
+			externalized,
+			true);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+		when(tracker.getSnapshottingSettings()).thenReturn(settings);
+
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(json);
+
+		assertEquals("exactly_once", rootNode.get("mode").asText());
+		assertEquals(interval, rootNode.get("interval").asLong());
+		assertEquals(timeout, rootNode.get("timeout").asLong());
+		assertEquals(minPause, rootNode.get("min_pause").asLong());
+		assertEquals(maxConcurrent, rootNode.get("max_concurrent").asInt());
+
+		JsonNode externalizedNode = rootNode.get("externalization");
+		assertNotNull(externalizedNode);
+		assertEquals(false, externalizedNode.get("enabled").asBoolean());
+	}
+
+	/**
+	 * Tests the that the isExactlyOnce flag is respected.
+	 */
+	@Test
+	public void testAtLeastOnce() throws Exception {
+		JobSnapshottingSettings settings = new JobSnapshottingSettings(
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			996979L,
+			1818L,
+			1212L,
+			12,
+			ExternalizedCheckpointSettings.none(),
+			false); // at least once
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+		when(tracker.getSnapshottingSettings()).thenReturn(settings);
+
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(json);
+
+		assertEquals("at_least_once", rootNode.get("mode").asText());
+	}
+
+	/**
+	 * Tests that the externalized checkpoint settings are forwarded.
+	 */
+	@Test
+	public void testEnabledExternalizedCheckpointSettings() throws Exception {
+		ExternalizedCheckpointSettings externalizedSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(true);
+
+		JobSnapshottingSettings settings = new JobSnapshottingSettings(
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			996979L,
+			1818L,
+			1212L,
+			12,
+			externalizedSettings,
+			false); // at least once
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+		when(tracker.getSnapshottingSettings()).thenReturn(settings);
+
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode externalizedNode = mapper.readTree(json).get("externalization");
+		assertNotNull(externalizedNode);
+		assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+		assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
new file mode 100644
index 0000000..0fada97
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CheckpointStatsCacheTest {
+
+	@Test
+	public void testZeroSizeCache() throws Exception {
+		AbstractCheckpointStats checkpoint = createCheckpoint(0, CheckpointStatsStatus.COMPLETED);
+
+		CheckpointStatsCache cache = new CheckpointStatsCache(0);
+		cache.tryAdd(checkpoint);
+		assertNull(cache.tryGet(0L));
+	}
+
+	@Test
+	public void testCacheAddAndGet() throws Exception {
+		AbstractCheckpointStats chk0 = createCheckpoint(0, CheckpointStatsStatus.COMPLETED);
+		AbstractCheckpointStats chk1 = createCheckpoint(1, CheckpointStatsStatus.COMPLETED);
+		AbstractCheckpointStats chk2 = createCheckpoint(2, CheckpointStatsStatus.IN_PROGRESS);
+
+		CheckpointStatsCache cache = new CheckpointStatsCache(1);
+		cache.tryAdd(chk0);
+		assertEquals(chk0, cache.tryGet(0));
+
+		cache.tryAdd(chk1);
+		assertNull(cache.tryGet(0));
+		assertEquals(chk1, cache.tryGet(1));
+
+		cache.tryAdd(chk2);
+		assertNull(cache.tryGet(2));
+		assertNull(cache.tryGet(0));
+		assertEquals(chk1, cache.tryGet(1));
+	}
+
+	private AbstractCheckpointStats createCheckpoint(long id, CheckpointStatsStatus status) {
+		AbstractCheckpointStats checkpoint = mock(AbstractCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(id);
+		when(checkpoint.getStatus()).thenReturn(status);
+		return checkpoint;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
new file mode 100644
index 0000000..17c8558
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CheckpointStatsDetailsHandlerTest {
+
+	/**
+	 * Tests request with illegal checkpoint ID param.
+	 */
+	@Test
+	public void testIllegalCheckpointId() throws Exception {
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "illegal checkpoint");
+		String json = handler.handleRequest(graph, params);
+
+		assertEquals("{}", json);
+	}
+
+	/**
+	 * Tests request with missing checkpoint ID param.
+	 */
+	@Test
+	public void testNoCheckpointIdParam() throws Exception {
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+		assertEquals("{}", json);
+	}
+
+	/**
+	 * Test lookup of not existing checkpoint in history.
+	 */
+	@Test
+	public void testCheckpointNotFound() throws Exception {
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpointById(anyLong())).thenReturn(null); // not found
+
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+		when(tracker.createSnapshot()).thenReturn(snapshot);
+
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "123");
+		String json = handler.handleRequest(graph, params);
+
+		assertEquals("{}", json);
+		verify(history, times(1)).getCheckpointById(anyLong());
+	}
+
+	/**
+	 * Tests a checkpoint details request for an in progress checkpoint.
+	 */
+	@Test
+	public void testCheckpointDetailsRequestInProgressCheckpoint() throws Exception {
+		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(checkpoint.getTriggerTimestamp()).thenReturn(1919191900L);
+		when(checkpoint.getLatestAckTimestamp()).thenReturn(1977791901L);
+		when(checkpoint.getStateSize()).thenReturn(111939272822L);
+		when(checkpoint.getEndToEndDuration()).thenReturn(121191L);
+		when(checkpoint.getAlignmentBuffered()).thenReturn(1L);
+		when(checkpoint.getNumberOfSubtasks()).thenReturn(501);
+		when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(101);
+
+		List<TaskStateStats> taskStats = new ArrayList<>();
+		TaskStateStats task1 = createTaskStateStats();
+		TaskStateStats task2 = createTaskStateStats();
+		taskStats.add(task1);
+		taskStats.add(task2);
+
+		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+		assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+		assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
+		assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
+		assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
+		assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong());
+		assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong());
+		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
+		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
+
+		verifyTaskNode(task1, rootNode);
+		verifyTaskNode(task2, rootNode);
+	}
+
+	/**
+	 * Tests a checkpoint details request for a completed checkpoint.
+	 */
+	@Test
+	public void testCheckpointDetailsRequestCompletedCheckpoint() throws Exception {
+		CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
+		when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
+		when(checkpoint.getStateSize()).thenReturn(925281L);
+		when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
+		when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
+		when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
+		when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
+		when(checkpoint.isDiscarded()).thenReturn(true);
+		when(checkpoint.getExternalPath()).thenReturn("checkpoint-external-path");
+
+		List<TaskStateStats> taskStats = new ArrayList<>();
+		TaskStateStats task1 = createTaskStateStats();
+		TaskStateStats task2 = createTaskStateStats();
+		taskStats.add(task1);
+		taskStats.add(task2);
+
+		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+		assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+		assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
+		assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
+		assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
+		assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong());
+		assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong());
+		assertEquals(checkpoint.isDiscarded(), rootNode.get("discarded").asBoolean());
+		assertEquals(checkpoint.getExternalPath(), rootNode.get("external_path").asText());
+		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
+		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
+
+		verifyTaskNode(task1, rootNode);
+		verifyTaskNode(task2, rootNode);
+	}
+
+	/**
+	 * Tests a checkpoint details request for a failed checkpoint.
+	 */
+	@Test
+	public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {
+		FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
+		when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
+		when(checkpoint.getStateSize()).thenReturn(925281L);
+		when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
+		when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
+		when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
+		when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
+		when(checkpoint.getFailureTimestamp()).thenReturn(123012890312093L);
+		when(checkpoint.getFailureMessage()).thenReturn("failure-message");
+
+		List<TaskStateStats> taskStats = new ArrayList<>();
+		TaskStateStats task1 = createTaskStateStats();
+		TaskStateStats task2 = createTaskStateStats();
+		taskStats.add(task1);
+		taskStats.add(task2);
+
+		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+		assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+		assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
+		assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
+		assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
+		assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong());
+		assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong());
+		assertEquals(checkpoint.getFailureTimestamp(), rootNode.get("failure_timestamp").asLong());
+		assertEquals(checkpoint.getFailureMessage(), rootNode.get("failure_message").asText());
+		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
+		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
+
+		verifyTaskNode(task1, rootNode);
+		verifyTaskNode(task2, rootNode);
+	}
+
+	// ------------------------------------------------------------------------
+
+	static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+		when(tracker.createSnapshot()).thenReturn(snapshot);
+
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "123");
+		String json = handler.handleRequest(graph, params);
+
+		ObjectMapper mapper = new ObjectMapper();
+		return mapper.readTree(json);
+	}
+
+	static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
+		long duration = ThreadLocalRandom.current().nextInt(128);
+
+		JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
+		assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
+		assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
+		assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
+		assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
+		assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
+		assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+	}
+
+	private static TaskStateStats createTaskStateStats() {
+		ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+		TaskStateStats task = mock(TaskStateStats.class);
+		when(task.getJobVertexId()).thenReturn(new JobVertexID());
+		when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) + 1);
+		when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(rand.nextInt(1024) + 1);
+		return task;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
new file mode 100644
index 0000000..8274b36
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CheckpointStatsHandlerTest {
+
+	/**
+	 * Tests a complete checkpoint stats snapshot.
+	 */
+	@Test
+	public void testCheckpointStatsRequest() throws Exception {
+		// Counts
+		CheckpointStatsCounts counts = mock(CheckpointStatsCounts.class);
+		when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L);
+		when(counts.getTotalNumberOfCheckpoints()).thenReturn(12981231203L);
+		when(counts.getNumberOfInProgressCheckpoints()).thenReturn(191919);
+		when(counts.getNumberOfCompletedCheckpoints()).thenReturn(882828200L);
+		when(counts.getNumberOfFailedCheckpoints()).thenReturn(99171510L);
+
+		// Summary
+		CompletedCheckpointStatsSummary summary = mock(CompletedCheckpointStatsSummary.class);
+
+		MinMaxAvgStats stateSizeSummary = mock(MinMaxAvgStats.class);
+		when(stateSizeSummary.getMinimum()).thenReturn(81238123L);
+		when(stateSizeSummary.getMaximum()).thenReturn(19919191999L);
+		when(stateSizeSummary.getAverage()).thenReturn(1133L);
+
+		MinMaxAvgStats durationSummary = mock(MinMaxAvgStats.class);
+		when(durationSummary.getMinimum()).thenReturn(1182L);
+		when(durationSummary.getMaximum()).thenReturn(88654L);
+		when(durationSummary.getAverage()).thenReturn(171L);
+
+		MinMaxAvgStats alignmentBufferedSummary = mock(MinMaxAvgStats.class);
+		when(alignmentBufferedSummary.getMinimum()).thenReturn(81818181899L);
+		when(alignmentBufferedSummary.getMaximum()).thenReturn(89999911118654L);
+		when(alignmentBufferedSummary.getAverage()).thenReturn(11203131L);
+
+		when(summary.getStateSizeStats()).thenReturn(stateSizeSummary);
+		when(summary.getEndToEndDurationStats()).thenReturn(durationSummary);
+		when(summary.getAlignmentBufferedStats()).thenReturn(alignmentBufferedSummary);
+
+		// Latest
+		CompletedCheckpointStats latestCompleted = mock(CompletedCheckpointStats.class);
+		when(latestCompleted.getCheckpointId()).thenReturn(1992139L);
+		when(latestCompleted.getTriggerTimestamp()).thenReturn(1919191900L);
+		when(latestCompleted.getLatestAckTimestamp()).thenReturn(1977791901L);
+		when(latestCompleted.getStateSize()).thenReturn(111939272822L);
+		when(latestCompleted.getEndToEndDuration()).thenReturn(121191L);
+		when(latestCompleted.getAlignmentBuffered()).thenReturn(1L);
+		when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path");
+
+		CompletedCheckpointStats latestSavepoint = mock(CompletedCheckpointStats.class);
+		when(latestSavepoint.getCheckpointId()).thenReturn(1992139L);
+		when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L);
+		when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L);
+		when(latestSavepoint.getStateSize()).thenReturn(111939272822L);
+		when(latestSavepoint.getEndToEndDuration()).thenReturn(121191L);
+		when(latestCompleted.getAlignmentBuffered()).thenReturn(182813L);
+		when(latestSavepoint.getExternalPath()).thenReturn("savepoint-external-path");
+
+		FailedCheckpointStats latestFailed = mock(FailedCheckpointStats.class);
+		when(latestFailed.getCheckpointId()).thenReturn(1112L);
+		when(latestFailed.getTriggerTimestamp()).thenReturn(12828L);
+		when(latestFailed.getLatestAckTimestamp()).thenReturn(1901L);
+		when(latestFailed.getFailureTimestamp()).thenReturn(11999976L);
+		when(latestFailed.getStateSize()).thenReturn(111L);
+		when(latestFailed.getEndToEndDuration()).thenReturn(12L);
+		when(latestFailed.getAlignmentBuffered()).thenReturn(2L);
+		when(latestFailed.getFailureMessage()).thenReturn("expected cause");
+
+		RestoredCheckpointStats latestRestored = mock(RestoredCheckpointStats.class);
+		when(latestRestored.getCheckpointId()).thenReturn(1199L);
+		when(latestRestored.getRestoreTimestamp()).thenReturn(434242L);
+		when(latestRestored.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(latestRestored.getExternalPath()).thenReturn("restored savepoint path");
+
+		// History
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
+
+		PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
+		when(inProgress.getCheckpointId()).thenReturn(1992139L);
+		when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+		when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
+		when(inProgress.getLatestAckTimestamp()).thenReturn(1977791901L);
+		when(inProgress.getStateSize()).thenReturn(111939272822L);
+		when(inProgress.getEndToEndDuration()).thenReturn(121191L);
+		when(inProgress.getAlignmentBuffered()).thenReturn(1L);
+		when(inProgress.getNumberOfSubtasks()).thenReturn(501);
+		when(inProgress.getNumberOfAcknowledgedSubtasks()).thenReturn(101);
+
+		CompletedCheckpointStats completedSavepoint = mock(CompletedCheckpointStats.class);
+		when(completedSavepoint.getCheckpointId()).thenReturn(1322139L);
+		when(completedSavepoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+		when(completedSavepoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(completedSavepoint.getTriggerTimestamp()).thenReturn(191900L);
+		when(completedSavepoint.getLatestAckTimestamp()).thenReturn(197791901L);
+		when(completedSavepoint.getStateSize()).thenReturn(1119822L);
+		when(completedSavepoint.getEndToEndDuration()).thenReturn(12191L);
+		when(completedSavepoint.getAlignmentBuffered()).thenReturn(111L);
+		when(completedSavepoint.getNumberOfSubtasks()).thenReturn(33501);
+		when(completedSavepoint.getNumberOfAcknowledgedSubtasks()).thenReturn(211);
+		when(completedSavepoint.isDiscarded()).thenReturn(true);
+		when(completedSavepoint.getExternalPath()).thenReturn("completed-external-path");
+
+		FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
+		when(failed.getCheckpointId()).thenReturn(110719L);
+		when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+		when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(failed.getTriggerTimestamp()).thenReturn(191900L);
+		when(failed.getLatestAckTimestamp()).thenReturn(197791901L);
+		when(failed.getStateSize()).thenReturn(1119822L);
+		when(failed.getEndToEndDuration()).thenReturn(12191L);
+		when(failed.getAlignmentBuffered()).thenReturn(111L);
+		when(failed.getNumberOfSubtasks()).thenReturn(33501);
+		when(failed.getNumberOfAcknowledgedSubtasks()).thenReturn(1);
+		when(failed.getFailureTimestamp()).thenReturn(119230L);
+		when(failed.getFailureMessage()).thenReturn("failure message");
+
+		checkpoints.add(inProgress);
+		checkpoints.add(completedSavepoint);
+		checkpoints.add(failed);
+		when(history.getCheckpoints()).thenReturn(checkpoints);
+		when(history.getLatestCompletedCheckpoint()).thenReturn(latestCompleted);
+		when(history.getLatestSavepoint()).thenReturn(latestSavepoint);
+		when(history.getLatestFailedCheckpoint()).thenReturn(latestFailed);
+
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getCounts()).thenReturn(counts);
+		when(snapshot.getSummaryStats()).thenReturn(summary);
+		when(snapshot.getHistory()).thenReturn(history);
+		when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+		when(tracker.createSnapshot()).thenReturn(snapshot);
+
+		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(json);
+
+		JsonNode countNode = rootNode.get("counts");
+		assertEquals(counts.getNumberOfRestoredCheckpoints(), countNode.get("restored").asLong());
+		assertEquals(counts.getTotalNumberOfCheckpoints(), countNode.get("total").asLong());
+		assertEquals(counts.getNumberOfInProgressCheckpoints(), countNode.get("in_progress").asLong());
+		assertEquals(counts.getNumberOfCompletedCheckpoints(), countNode.get("completed").asLong());
+		assertEquals(counts.getNumberOfFailedCheckpoints(), countNode.get("failed").asLong());
+
+		JsonNode summaryNode = rootNode.get("summary");
+		JsonNode sizeSummaryNode = summaryNode.get("state_size");
+		assertEquals(stateSizeSummary.getMinimum(), sizeSummaryNode.get("min").asLong());
+		assertEquals(stateSizeSummary.getMaximum(), sizeSummaryNode.get("max").asLong());
+		assertEquals(stateSizeSummary.getAverage(), sizeSummaryNode.get("avg").asLong());
+
+		JsonNode durationSummaryNode = summaryNode.get("end_to_end_duration");
+		assertEquals(durationSummary.getMinimum(), durationSummaryNode.get("min").asLong());
+		assertEquals(durationSummary.getMaximum(), durationSummaryNode.get("max").asLong());
+		assertEquals(durationSummary.getAverage(), durationSummaryNode.get("avg").asLong());
+
+		JsonNode alignmentBufferedNode = summaryNode.get("alignment_buffered");
+		assertEquals(alignmentBufferedSummary.getMinimum(), alignmentBufferedNode.get("min").asLong());
+		assertEquals(alignmentBufferedSummary.getMaximum(), alignmentBufferedNode.get("max").asLong());
+		assertEquals(alignmentBufferedSummary.getAverage(), alignmentBufferedNode.get("avg").asLong());
+
+		JsonNode latestNode = rootNode.get("latest");
+		JsonNode latestCheckpointNode = latestNode.get("completed");
+		assertEquals(latestCompleted.getCheckpointId(), latestCheckpointNode.get("id").asLong());
+		assertEquals(latestCompleted.getTriggerTimestamp(), latestCheckpointNode.get("trigger_timestamp").asLong());
+		assertEquals(latestCompleted.getLatestAckTimestamp(), latestCheckpointNode.get("latest_ack_timestamp").asLong());
+		assertEquals(latestCompleted.getStateSize(), latestCheckpointNode.get("state_size").asLong());
+		assertEquals(latestCompleted.getEndToEndDuration(), latestCheckpointNode.get("end_to_end_duration").asLong());
+		assertEquals(latestCompleted.getAlignmentBuffered(), latestCheckpointNode.get("alignment_buffered").asLong());
+		assertEquals(latestCompleted.getExternalPath(), latestCheckpointNode.get("external_path").asText());
+
+		JsonNode latestSavepointNode = latestNode.get("savepoint");
+		assertEquals(latestSavepoint.getCheckpointId(), latestSavepointNode.get("id").asLong());
+		assertEquals(latestSavepoint.getTriggerTimestamp(), latestSavepointNode.get("trigger_timestamp").asLong());
+		assertEquals(latestSavepoint.getLatestAckTimestamp(), latestSavepointNode.get("latest_ack_timestamp").asLong());
+		assertEquals(latestSavepoint.getStateSize(), latestSavepointNode.get("state_size").asLong());
+		assertEquals(latestSavepoint.getEndToEndDuration(), latestSavepointNode.get("end_to_end_duration").asLong());
+		assertEquals(latestSavepoint.getAlignmentBuffered(), latestSavepointNode.get("alignment_buffered").asLong());
+		assertEquals(latestSavepoint.getExternalPath(), latestSavepointNode.get("external_path").asText());
+
+		JsonNode latestFailedNode = latestNode.get("failed");
+		assertEquals(latestFailed.getCheckpointId(), latestFailedNode.get("id").asLong());
+		assertEquals(latestFailed.getTriggerTimestamp(), latestFailedNode.get("trigger_timestamp").asLong());
+		assertEquals(latestFailed.getLatestAckTimestamp(), latestFailedNode.get("latest_ack_timestamp").asLong());
+		assertEquals(latestFailed.getStateSize(), latestFailedNode.get("state_size").asLong());
+		assertEquals(latestFailed.getEndToEndDuration(), latestFailedNode.get("end_to_end_duration").asLong());
+		assertEquals(latestFailed.getAlignmentBuffered(), latestFailedNode.get("alignment_buffered").asLong());
+		assertEquals(latestFailed.getFailureTimestamp(), latestFailedNode.get("failure_timestamp").asLong());
+		assertEquals(latestFailed.getFailureMessage(), latestFailedNode.get("failure_message").asText());
+
+		JsonNode latestRestoredNode = latestNode.get("restored");
+		assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong());
+		assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong());
+		assertEquals(CheckpointProperties.isSavepoint(latestRestored.getProperties()), latestRestoredNode.get("is_savepoint").asBoolean());
+		assertEquals(latestRestored.getExternalPath(), latestRestoredNode.get("external_path").asText());
+
+		JsonNode historyNode = rootNode.get("history");
+		Iterator<JsonNode> it = historyNode.iterator();
+
+		assertTrue(it.hasNext());
+		JsonNode inProgressNode = it.next();
+
+		assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong());
+		assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText());
+		assertEquals(CheckpointProperties.isSavepoint(inProgress.getProperties()), inProgressNode.get("is_savepoint").asBoolean());
+		assertEquals(inProgress.getTriggerTimestamp(), inProgressNode.get("trigger_timestamp").asLong());
+		assertEquals(inProgress.getLatestAckTimestamp(), inProgressNode.get("latest_ack_timestamp").asLong());
+		assertEquals(inProgress.getStateSize(), inProgressNode.get("state_size").asLong());
+		assertEquals(inProgress.getEndToEndDuration(), inProgressNode.get("end_to_end_duration").asLong());
+		assertEquals(inProgress.getAlignmentBuffered(), inProgressNode.get("alignment_buffered").asLong());
+		assertEquals(inProgress.getNumberOfSubtasks(), inProgressNode.get("num_subtasks").asInt());
+		assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), inProgressNode.get("num_acknowledged_subtasks").asInt());
+
+		assertTrue(it.hasNext());
+		JsonNode completedSavepointNode = it.next();
+
+		assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong());
+		assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText());
+		assertEquals(CheckpointProperties.isSavepoint(completedSavepoint.getProperties()), completedSavepointNode.get("is_savepoint").asBoolean());
+		assertEquals(completedSavepoint.getTriggerTimestamp(), completedSavepointNode.get("trigger_timestamp").asLong());
+		assertEquals(completedSavepoint.getLatestAckTimestamp(), completedSavepointNode.get("latest_ack_timestamp").asLong());
+		assertEquals(completedSavepoint.getStateSize(), completedSavepointNode.get("state_size").asLong());
+		assertEquals(completedSavepoint.getEndToEndDuration(), completedSavepointNode.get("end_to_end_duration").asLong());
+		assertEquals(completedSavepoint.getAlignmentBuffered(), completedSavepointNode.get("alignment_buffered").asLong());
+		assertEquals(completedSavepoint.getNumberOfSubtasks(), completedSavepointNode.get("num_subtasks").asInt());
+		assertEquals(completedSavepoint.getNumberOfAcknowledgedSubtasks(), completedSavepointNode.get("num_acknowledged_subtasks").asInt());
+
+		assertEquals(completedSavepoint.getExternalPath(), completedSavepointNode.get("external_path").asText());
+		assertEquals(completedSavepoint.isDiscarded(), completedSavepointNode.get("discarded").asBoolean());
+
+		assertTrue(it.hasNext());
+		JsonNode failedNode = it.next();
+
+		assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong());
+		assertEquals(failed.getStatus().toString(), failedNode.get("status").asText());
+		assertEquals(CheckpointProperties.isSavepoint(failed.getProperties()), failedNode.get("is_savepoint").asBoolean());
+		assertEquals(failed.getTriggerTimestamp(), failedNode.get("trigger_timestamp").asLong());
+		assertEquals(failed.getLatestAckTimestamp(), failedNode.get("latest_ack_timestamp").asLong());
+		assertEquals(failed.getStateSize(), failedNode.get("state_size").asLong());
+		assertEquals(failed.getEndToEndDuration(), failedNode.get("end_to_end_duration").asLong());
+		assertEquals(failed.getAlignmentBuffered(), failedNode.get("alignment_buffered").asLong());
+		assertEquals(failed.getNumberOfSubtasks(), failedNode.get("num_subtasks").asInt());
+		assertEquals(failed.getNumberOfAcknowledgedSubtasks(), failedNode.get("num_acknowledged_subtasks").asInt());
+
+		assertEquals(failed.getFailureTimestamp(), failedNode.get("failure_timestamp").asLong());
+		assertEquals(failed.getFailureMessage(), failedNode.get("failure_message").asText());
+
+		assertFalse(it.hasNext());
+	}
+}