You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/10 15:35:04 UTC

[1/3] flink git commit: [FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

Repository: flink
Updated Branches:
  refs/heads/master 6b3fdc288 -> 0a286d0ff


http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index b352bae..bcb13d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
index 709c21d..deffaae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.messages;
 
-import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
 
 /**
  * Tests for the {@link CheckpointConfigInfo}.

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java
deleted file mode 100644
index 8e8a50f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
-
-import java.util.Arrays;
-
-/**
- * Tests for {@link CheckpointStatistics}.
- */
-public class CheckpointStatisticsTest extends RestResponseMarshallingTestBase<CheckpointStatistics> {
-	@Override
-	protected Class<CheckpointStatistics> getTestResponseClass() {
-		return CheckpointStatistics.class;
-	}
-
-	@Override
-	protected CheckpointStatistics getTestResponseInstance() throws Exception {
-
-		final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts(1, 2, 3, 4, 5);
-		final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary(
-			new CheckpointStatistics.MinMaxAvgStatistics(1L, 1L, 1L),
-			new CheckpointStatistics.MinMaxAvgStatistics(2L, 2L, 2L),
-			new CheckpointStatistics.MinMaxAvgStatistics(3L, 3L, 3L));
-
-		final CheckpointStatistics.CompletedCheckpointStatistics completed = new CheckpointStatistics.CompletedCheckpointStatistics(
-			1L,
-			CheckpointStatsStatus.COMPLETED,
-			false,
-			42L,
-			41L,
-			1337L,
-			1L,
-			0L,
-			10,
-			10,
-			null,
-			false);
-
-		final CheckpointStatistics.CompletedCheckpointStatistics savepoint = new CheckpointStatistics.CompletedCheckpointStatistics(
-			2L,
-			CheckpointStatsStatus.COMPLETED,
-			true,
-			11L,
-			10L,
-			43L,
-			1L,
-			0L,
-			9,
-			9,
-			"externalPath",
-			false);
-
-		final CheckpointStatistics.FailedCheckpointStatistics failed = new CheckpointStatistics.FailedCheckpointStatistics(
-			3L,
-			CheckpointStatsStatus.FAILED,
-			false,
-			5L,
-			10L,
-			4L,
-			2L,
-			0L,
-			11,
-			9,
-			100L,
-			"Test failure");
-
-		CheckpointStatistics.RestoredCheckpointStatistics restored = new CheckpointStatistics.RestoredCheckpointStatistics(
-			4L,
-			1445L,
-			true,
-			"foobar");
-
-		final CheckpointStatistics.LatestCheckpoints latestCheckpoints = new CheckpointStatistics.LatestCheckpoints(
-			completed,
-			savepoint,
-			failed,
-			restored);
-
-		return new CheckpointStatistics(
-			counts,
-			summary,
-			latestCheckpoints,
-			Arrays.asList(completed, savepoint, failed));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java
new file mode 100644
index 0000000..8521d34
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link CheckpointingStatistics}.
+ */
+public class CheckpointingStatisticsTest extends RestResponseMarshallingTestBase<CheckpointingStatistics> {
+	@Override
+	protected Class<CheckpointingStatistics> getTestResponseClass() {
+		return CheckpointingStatistics.class;
+	}
+
+	@Override
+	protected CheckpointingStatistics getTestResponseInstance() throws Exception {
+
+		final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(1, 2, 3, 4, 5);
+		final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary(
+			new CheckpointingStatistics.MinMaxAvgStatistics(1L, 1L, 1L),
+			new CheckpointingStatistics.MinMaxAvgStatistics(2L, 2L, 2L),
+			new CheckpointingStatistics.MinMaxAvgStatistics(3L, 3L, 3L));
+
+		final Map<JobVertexID, CheckpointStatistics.TaskCheckpointStatistics> checkpointStatisticsPerTask = new HashMap<>(2);
+
+		checkpointStatisticsPerTask.put(
+			new JobVertexID(),
+			new CheckpointStatistics.TaskCheckpointStatistics(
+				1L,
+				2L,
+				3L,
+				4L,
+				5,
+				6));
+
+		checkpointStatisticsPerTask.put(
+			new JobVertexID(),
+			new CheckpointStatistics.TaskCheckpointStatistics(
+				2L,
+				3L,
+				4L,
+				5L,
+				6,
+				7));
+
+		final CheckpointStatistics.CompletedCheckpointStatistics completed = new CheckpointStatistics.CompletedCheckpointStatistics(
+			1L,
+			CheckpointStatsStatus.COMPLETED,
+			false,
+			42L,
+			41L,
+			1337L,
+			1L,
+			0L,
+			10,
+			10,
+			Collections.emptyMap(),
+			null,
+			false);
+
+		final CheckpointStatistics.CompletedCheckpointStatistics savepoint = new CheckpointStatistics.CompletedCheckpointStatistics(
+			2L,
+			CheckpointStatsStatus.COMPLETED,
+			true,
+			11L,
+			10L,
+			43L,
+			1L,
+			0L,
+			9,
+			9,
+			checkpointStatisticsPerTask,
+			"externalPath",
+			false);
+
+		final CheckpointStatistics.FailedCheckpointStatistics failed = new CheckpointStatistics.FailedCheckpointStatistics(
+			3L,
+			CheckpointStatsStatus.FAILED,
+			false,
+			5L,
+			10L,
+			4L,
+			2L,
+			0L,
+			11,
+			9,
+			Collections.emptyMap(),
+			100L,
+			"Test failure");
+
+		CheckpointingStatistics.RestoredCheckpointStatistics restored = new CheckpointingStatistics.RestoredCheckpointStatistics(
+			4L,
+			1445L,
+			true,
+			"foobar");
+
+		final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
+			completed,
+			savepoint,
+			failed,
+			restored);
+
+		return new CheckpointingStatistics(
+			counts,
+			summary,
+			latestCheckpoints,
+			Arrays.asList(completed, savepoint, failed));
+	}
+}


[3/3] flink git commit: [FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

Posted by tr...@apache.org.
[FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

Disable failing when not all creator properties are known

Move CheckpointStatsCache out of legacy package; Remove unused CheckpointingStatistics#generateCheckpointStatistics method

Remove JsonInclude.Include.NON_NULL from CheckpointStatistics; Pull null check out of CheckpointStatistics#generateCheckpointStatistics; Make CheckpointStatistics#checkpointStatisticcsPerTask non nullable; Add fail on missing creator property

This closes #4763.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a286d0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a286d0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a286d0f

Branch: refs/heads/master
Commit: 0a286d0ff98afa68034daff4634f526eaaf97897
Parents: 6b3fdc2
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 2 19:39:38 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 10 17:34:14 2017 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   2 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  29 +-
 .../rest/handler/RestHandlerConfiguration.java  |  18 +-
 .../job/AbstractExecutionGraphHandler.java      |  10 +-
 .../rest/handler/job/JobConfigHandler.java      |   5 +-
 .../checkpoints/AbstractCheckpointHandler.java  |  91 +++
 .../checkpoints/CheckpointConfigHandler.java    |   7 +-
 .../CheckpointStatisticDetailsHandler.java      |  54 ++
 .../CheckpointStatisticsHandler.java            | 181 -----
 .../job/checkpoints/CheckpointStatsCache.java   |  81 ++
 .../CheckpointingStatisticsHandler.java         | 153 ++++
 .../checkpoints/CheckpointConfigHandler.java    |   2 +-
 .../checkpoints/CheckpointStatsCache.java       |  81 --
 .../CheckpointStatsDetailsHandler.java          |   1 +
 .../CheckpointStatsDetailsSubtasksHandler.java  |   1 +
 .../checkpoints/CheckpointStatsHandler.java     |  81 +-
 .../rest/messages/CheckpointConfigHeaders.java  |  70 --
 .../rest/messages/CheckpointConfigInfo.java     | 151 ----
 .../rest/messages/CheckpointStatistics.java     | 763 -------------------
 .../messages/CheckpointStatisticsHeaders.java   |  68 --
 .../rest/messages/JobMessageParameters.java     |   2 +-
 .../checkpoints/CheckpointConfigHeaders.java    |  73 ++
 .../checkpoints/CheckpointConfigInfo.java       | 152 ++++
 .../checkpoints/CheckpointIdPathParameter.java  |  48 ++
 .../CheckpointMessageParameters.java            |  38 +
 .../CheckpointStatisticDetailsHeaders.java      |  72 ++
 .../checkpoints/CheckpointStatistics.java       | 537 +++++++++++++
 .../checkpoints/CheckpointingStatistics.java    | 478 ++++++++++++
 .../CheckpointingStatisticsHeaders.java         |  71 ++
 .../messages/json/JobVertexIDDeserializer.java  |  37 +
 .../messages/json/JobVertexIDSerializer.java    |  44 ++
 .../checkpoints/CheckpointStatsCacheTest.java   |   1 +
 .../CheckpointStatsDetailsHandlerTest.java      |   1 +
 ...heckpointStatsSubtaskDetailsHandlerTest.java |   1 +
 .../messages/CheckpointConfigInfoTest.java      |   2 +-
 .../messages/CheckpointStatisticsTest.java      | 104 ---
 .../messages/CheckpointingStatisticsTest.java   | 134 ++++
 37 files changed, 2164 insertions(+), 1480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 0bf6552..1a6178f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.WebHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
@@ -58,7 +59,6 @@ import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
 import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler;
-import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler;
 import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index d64e649..2a2d9be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -32,7 +32,9 @@ import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
@@ -43,8 +45,6 @@ import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpeci
 import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
 import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders;
-import org.apache.flink.runtime.rest.messages.CheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
@@ -52,6 +52,9 @@ import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FileUtils;
@@ -78,6 +81,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 	private final Executor executor;
 
 	private final ExecutionGraphCache executionGraphCache;
+	private final CheckpointStatsCache checkpointStatsCache;
 
 	public DispatcherRestEndpoint(
 			RestServerEndpointConfiguration endpointConfiguration,
@@ -94,6 +98,9 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		this.executionGraphCache = new ExecutionGraphCache(
 			restConfiguration.getTimeout(),
 			Time.milliseconds(restConfiguration.getRefreshInterval()));
+
+		this.checkpointStatsCache = new CheckpointStatsCache(
+			restConfiguration.getMaxCheckpointStatisticCacheEntries());
 	}
 
 	@Override
@@ -162,14 +169,23 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executionGraphCache,
 			executor);
 
-		CheckpointStatisticsHandler checkpointStatisticsHandler = new CheckpointStatisticsHandler(
+		CheckpointingStatisticsHandler checkpointStatisticsHandler = new CheckpointingStatisticsHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
-			CheckpointStatisticsHeaders.getInstance(),
+			CheckpointingStatisticsHeaders.getInstance(),
 			executionGraphCache,
 			executor);
 
+		CheckpointStatisticDetailsHandler checkpointStatisticDetailsHandler = new CheckpointStatisticDetailsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			CheckpointStatisticDetailsHeaders.getInstance(),
+			executionGraphCache,
+			executor,
+			checkpointStatsCache);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -192,7 +208,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
 		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
 		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
-		handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
+		handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
+		handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler));
 
 		BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
 		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 9220bd9..0344597 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -32,14 +32,22 @@ public class RestHandlerConfiguration {
 
 	private final long refreshInterval;
 
+	private final int maxCheckpointStatisticCacheEntries;
+
 	private final Time timeout;
 
 	private final File tmpDir;
 
-	public RestHandlerConfiguration(long refreshInterval, Time timeout, File tmpDir) {
+	public RestHandlerConfiguration(
+			long refreshInterval,
+			int maxCheckpointStatisticCacheEntries,
+			Time timeout,
+			File tmpDir) {
 		Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
 		this.refreshInterval = refreshInterval;
 
+		this.maxCheckpointStatisticCacheEntries = maxCheckpointStatisticCacheEntries;
+
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.tmpDir = Preconditions.checkNotNull(tmpDir);
 	}
@@ -48,6 +56,10 @@ public class RestHandlerConfiguration {
 		return refreshInterval;
 	}
 
+	public int getMaxCheckpointStatisticCacheEntries() {
+		return maxCheckpointStatisticCacheEntries;
+	}
+
 	public Time getTimeout() {
 		return timeout;
 	}
@@ -59,10 +71,12 @@ public class RestHandlerConfiguration {
 	public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
 		final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);
 
+		final int maxCheckpointStatisticCacheEntries = configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
+
 		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
 
 		final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
 
-		return new RestHandlerConfiguration(refreshInterval, timeout, tmpDir);
+		return new RestHandlerConfiguration(refreshInterval, maxCheckpointStatisticCacheEntries, timeout, tmpDir);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index f2b1ac8..5348b55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -45,7 +45,7 @@ import java.util.concurrent.Executor;
  *
  * @param <R> response type
  */
-public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, JobMessageParameters> {
+public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M extends JobMessageParameters> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, M> {
 
 	private final ExecutionGraphCache executionGraphCache;
 
@@ -55,7 +55,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
-			MessageHeaders<EmptyRequestBody, R, JobMessageParameters> messageHeaders,
+			MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor) {
 		super(localRestAddress, leaderRetriever, timeout, messageHeaders);
@@ -65,7 +65,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
 	}
 
 	@Override
-	protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+	protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, M> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
 		JobID jobId = request.getPathParameter(JobIDPathParameter.class);
 
 		CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, gateway);
@@ -73,7 +73,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
 		return executionGraphFuture.thenApplyAsync(
 			executionGraph -> {
 				try {
-					return handleRequest(executionGraph);
+					return handleRequest(request, executionGraph);
 				} catch (RestHandlerException rhe) {
 					throw new CompletionException(rhe);
 				}
@@ -81,5 +81,5 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
 			executor);
 	}
 
-	protected abstract R handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException;
+	protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph) throws RestHandlerException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index bbe4eef..f27d84f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobConfigInfo;
@@ -35,7 +36,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job configuration.
  */
-public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo> {
+public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> {
 
 	public JobConfigHandler(
 			CompletableFuture<String> localRestAddress,
@@ -55,7 +56,7 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInf
 	}
 
 	@Override
-	protected JobConfigInfo handleRequest(AccessExecutionGraph executionGraph) {
+	protected JobConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
 		final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig();
 		final JobConfigInfo.ExecutionConfigInfo executionConfigInfo;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
new file mode 100644
index 0000000..62ed1a4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for checkpoint related REST handler.
+ *
+ * @param <R> type of the response
+ */
+public abstract class AbstractCheckpointHandler<R extends ResponseBody> extends AbstractExecutionGraphHandler<R, CheckpointMessageParameters> {
+
+	private final CheckpointStatsCache checkpointStatsCache;
+
+	protected AbstractCheckpointHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, R, CheckpointMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor,
+			CheckpointStatsCache checkpointStatsCache) {
+		super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
+
+		this.checkpointStatsCache = Preconditions.checkNotNull(checkpointStatsCache);
+	}
+
+	@Override
+	protected R handleRequest(HandlerRequest<EmptyRequestBody, CheckpointMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+		final long checkpointId = request.getPathParameter(CheckpointIdPathParameter.class);
+
+		final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
+
+		if (checkpointStatsSnapshot != null) {
+			AbstractCheckpointStats checkpointStats = checkpointStatsSnapshot.getHistory().getCheckpointById(checkpointId);
+
+			if (checkpointStats != null) {
+				checkpointStatsCache.tryAdd(checkpointStats);
+			} else {
+				checkpointStats = checkpointStatsCache.tryGet(checkpointId);
+			}
+
+			if (checkpointStats != null) {
+				return handleCheckpointRequest(checkpointStats);
+			} else {
+				throw new RestHandlerException("Could not find checkpointing statistics for checkpoint " + checkpointId + '.', HttpResponseStatus.NOT_FOUND);
+			}
+		} else {
+			throw new RestHandlerException("Checkpointing was not enabled for job " + executionGraph.getJobID() + '.', HttpResponseStatus.NOT_FOUND);
+		}
+	}
+
+	protected abstract R handleCheckpointRequest(AbstractCheckpointStats checkpointStats);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 94646eb..1efa7af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
@@ -40,7 +41,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler which serves the checkpoint configuration.
  */
-public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo> {
+public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> {
 
 	public CheckpointConfigHandler(
 			CompletableFuture<String> localRestAddress,
@@ -59,7 +60,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check
 	}
 
 	@Override
-	protected CheckpointConfigInfo handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException {
+	protected CheckpointConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
 		final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration();
 
 		if (checkpointCoordinatorConfiguration == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
new file mode 100644
index 0000000..2fc3008
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * REST handler which returns the details for a checkpoint.
+ */
+public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics> {
+
+	public CheckpointStatisticDetailsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor,
+			CheckpointStatsCache checkpointStatsCache) {
+		super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache);
+	}
+
+	@Override
+	protected CheckpointStatistics handleCheckpointRequest(AbstractCheckpointStats checkpointStats) {
+		return CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
deleted file mode 100644
index 21ded78..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job.checkpoints;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
-import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Handler which serves the checkpoint statistics.
- */
-public class CheckpointStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointStatistics> {
-
-	public CheckpointStatisticsHandler(
-			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
-			Time timeout,
-			MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> messageHeaders,
-			ExecutionGraphCache executionGraphCache,
-			Executor executor) {
-		super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
-	}
-
-	@Override
-	protected CheckpointStatistics handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException {
-
-		final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
-
-		if (checkpointStatsSnapshot == null) {
-			throw new RestHandlerException("Checkpointing has not been enabled.", HttpResponseStatus.NOT_FOUND);
-		} else {
-			final CheckpointStatsCounts checkpointStatsCounts = checkpointStatsSnapshot.getCounts();
-
-			final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts(
-				checkpointStatsCounts.getNumberOfRestoredCheckpoints(),
-				checkpointStatsCounts.getTotalNumberOfCheckpoints(),
-				checkpointStatsCounts.getNumberOfInProgressCheckpoints(),
-				checkpointStatsCounts.getNumberOfCompletedCheckpoints(),
-				checkpointStatsCounts.getNumberOfFailedCheckpoints());
-
-			final CompletedCheckpointStatsSummary checkpointStatsSummary = checkpointStatsSnapshot.getSummaryStats();
-			final MinMaxAvgStats stateSize = checkpointStatsSummary.getStateSizeStats();
-			final MinMaxAvgStats duration = checkpointStatsSummary.getEndToEndDurationStats();
-			final MinMaxAvgStats alignment = checkpointStatsSummary.getAlignmentBufferedStats();
-
-			final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary(
-				new CheckpointStatistics.MinMaxAvgStatistics(
-					stateSize.getMinimum(),
-					stateSize.getMaximum(),
-					stateSize.getAverage()),
-				new CheckpointStatistics.MinMaxAvgStatistics(
-					duration.getMinimum(),
-					duration.getMaximum(),
-					duration.getAverage()),
-				new CheckpointStatistics.MinMaxAvgStatistics(
-					alignment.getMinimum(),
-					alignment.getMaximum(),
-					alignment.getAverage()));
-
-			final CheckpointStatsHistory checkpointStatsHistory = checkpointStatsSnapshot.getHistory();
-
-			final CheckpointStatistics.CompletedCheckpointStatistics completed = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestCompletedCheckpoint());
-			final CheckpointStatistics.CompletedCheckpointStatistics savepoint = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestSavepoint());
-			final CheckpointStatistics.FailedCheckpointStatistics failed = (CheckpointStatistics.FailedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestFailedCheckpoint());
-
-			final RestoredCheckpointStats restoredCheckpointStats = checkpointStatsSnapshot.getLatestRestoredCheckpoint();
-
-			final CheckpointStatistics.RestoredCheckpointStatistics restored;
-
-			if (restoredCheckpointStats == null) {
-				restored = null;
-			} else {
-				restored = new CheckpointStatistics.RestoredCheckpointStatistics(
-					restoredCheckpointStats.getCheckpointId(),
-					restoredCheckpointStats.getRestoreTimestamp(),
-					restoredCheckpointStats.getProperties().isSavepoint(),
-					restoredCheckpointStats.getExternalPath());
-			}
-
-			final CheckpointStatistics.LatestCheckpoints latestCheckpoints = new CheckpointStatistics.LatestCheckpoints(
-				completed,
-				savepoint,
-				failed,
-				restored);
-
-			final List<CheckpointStatistics.BaseCheckpointStatistics> history = new ArrayList<>(16);
-
-			for (AbstractCheckpointStats abstractCheckpointStats : checkpointStatsSnapshot.getHistory().getCheckpoints()) {
-				history.add(generateCheckpointStatistics(abstractCheckpointStats));
-			}
-
-			return new CheckpointStatistics(
-				counts,
-				summary,
-				latestCheckpoints,
-				history);
-		}
-	}
-
-	private static CheckpointStatistics.BaseCheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
-		if (checkpointStats != null) {
-			if (checkpointStats instanceof CompletedCheckpointStats) {
-				final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
-
-				return new CheckpointStatistics.CompletedCheckpointStatistics(
-					completedCheckpointStats.getCheckpointId(),
-					completedCheckpointStats.getStatus(),
-					completedCheckpointStats.getProperties().isSavepoint(),
-					completedCheckpointStats.getTriggerTimestamp(),
-					completedCheckpointStats.getLatestAckTimestamp(),
-					completedCheckpointStats.getStateSize(),
-					completedCheckpointStats.getEndToEndDuration(),
-					completedCheckpointStats.getAlignmentBuffered(),
-					completedCheckpointStats.getNumberOfSubtasks(),
-					completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
-					completedCheckpointStats.getExternalPath(),
-					completedCheckpointStats.isDiscarded());
-			} else if (checkpointStats instanceof FailedCheckpointStats) {
-				final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
-
-				return new CheckpointStatistics.FailedCheckpointStatistics(
-					failedCheckpointStats.getCheckpointId(),
-					failedCheckpointStats.getStatus(),
-					failedCheckpointStats.getProperties().isSavepoint(),
-					failedCheckpointStats.getTriggerTimestamp(),
-					failedCheckpointStats.getLatestAckTimestamp(),
-					failedCheckpointStats.getStateSize(),
-					failedCheckpointStats.getEndToEndDuration(),
-					failedCheckpointStats.getAlignmentBuffered(),
-					failedCheckpointStats.getNumberOfSubtasks(),
-					failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
-					failedCheckpointStats.getFailureTimestamp(),
-					failedCheckpointStats.getFailureMessage());
-			} else {
-				throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
-			}
-		} else {
-			return null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
new file mode 100644
index 0000000..dcd36b0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+/**
+ * A size-based cache of accessed checkpoints for completed and failed
+ * checkpoints.
+ *
+ * <p>Having this cache in place for accessed stats improves the user
+ * experience quite a bit as accessed checkpoint stats stay available
+ * and don't expire. For example if you manage to click on the last
+ * checkpoint in the history, it is not available via the stats as soon
+ * as another checkpoint is triggered. With the cache in place, the
+ * checkpoint will still be available for investigation.
+ */
+public class CheckpointStatsCache {
+
+	@Nullable
+	private final Cache<Long, AbstractCheckpointStats> cache;
+
+	public CheckpointStatsCache(int maxNumEntries) {
+		if (maxNumEntries > 0) {
+			this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder()
+				.maximumSize(maxNumEntries)
+				.build();
+		} else {
+			this.cache = null;
+		}
+	}
+
+	/**
+	 * Try to add the checkpoint to the cache.
+	 *
+	 * @param checkpoint Checkpoint to be added.
+	 */
+	public void tryAdd(AbstractCheckpointStats checkpoint) {
+		// Don't add in progress checkpoints as they will be replaced by their
+		// completed/failed version eventually.
+		if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
+			cache.put(checkpoint.getCheckpointId(), checkpoint);
+		}
+	}
+
+	/**
+	 * Try to look up a checkpoint by it's ID in the cache.
+	 *
+	 * @param checkpointId ID of the checkpoint to look up.
+	 * @return The checkpoint or <code>null</code> if checkpoint not found.
+	 */
+	public AbstractCheckpointStats tryGet(long checkpointId) {
+		if (cache != null) {
+			return cache.getIfPresent(checkpointId);
+		} else {
+			return null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
new file mode 100644
index 0000000..1c5762e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler which serves the checkpoint statistics.
+ */
+public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> {
+
+	public CheckpointingStatisticsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+		super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
+	}
+
+	@Override
+	protected CheckpointingStatistics handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+
+		final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
+
+		if (checkpointStatsSnapshot == null) {
+			throw new RestHandlerException("Checkpointing has not been enabled.", HttpResponseStatus.NOT_FOUND);
+		} else {
+			final CheckpointStatsCounts checkpointStatsCounts = checkpointStatsSnapshot.getCounts();
+
+			final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(
+				checkpointStatsCounts.getNumberOfRestoredCheckpoints(),
+				checkpointStatsCounts.getTotalNumberOfCheckpoints(),
+				checkpointStatsCounts.getNumberOfInProgressCheckpoints(),
+				checkpointStatsCounts.getNumberOfCompletedCheckpoints(),
+				checkpointStatsCounts.getNumberOfFailedCheckpoints());
+
+			final CompletedCheckpointStatsSummary checkpointStatsSummary = checkpointStatsSnapshot.getSummaryStats();
+			final MinMaxAvgStats stateSize = checkpointStatsSummary.getStateSizeStats();
+			final MinMaxAvgStats duration = checkpointStatsSummary.getEndToEndDurationStats();
+			final MinMaxAvgStats alignment = checkpointStatsSummary.getAlignmentBufferedStats();
+
+			final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary(
+				new CheckpointingStatistics.MinMaxAvgStatistics(
+					stateSize.getMinimum(),
+					stateSize.getMaximum(),
+					stateSize.getAverage()),
+				new CheckpointingStatistics.MinMaxAvgStatistics(
+					duration.getMinimum(),
+					duration.getMaximum(),
+					duration.getAverage()),
+				new CheckpointingStatistics.MinMaxAvgStatistics(
+					alignment.getMinimum(),
+					alignment.getMaximum(),
+					alignment.getAverage()));
+
+			final CheckpointStatsHistory checkpointStatsHistory = checkpointStatsSnapshot.getHistory();
+
+			final CheckpointStatistics.CompletedCheckpointStatistics completed = checkpointStatsHistory.getLatestCompletedCheckpoint() != null ?
+				(CheckpointStatistics.CompletedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics(
+					checkpointStatsHistory.getLatestCompletedCheckpoint(),
+					false) :
+				null;
+
+			final CheckpointStatistics.CompletedCheckpointStatistics savepoint = checkpointStatsHistory.getLatestSavepoint() != null ?
+				(CheckpointStatistics.CompletedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics(
+					checkpointStatsHistory.getLatestSavepoint(),
+					false) :
+				null;
+
+			final CheckpointStatistics.FailedCheckpointStatistics failed = checkpointStatsHistory.getLatestFailedCheckpoint() != null ?
+				(CheckpointStatistics.FailedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics(
+					checkpointStatsHistory.getLatestFailedCheckpoint(),
+					false) :
+				null;
+
+			final RestoredCheckpointStats restoredCheckpointStats = checkpointStatsSnapshot.getLatestRestoredCheckpoint();
+
+			final CheckpointingStatistics.RestoredCheckpointStatistics restored;
+
+			if (restoredCheckpointStats == null) {
+				restored = null;
+			} else {
+				restored = new CheckpointingStatistics.RestoredCheckpointStatistics(
+					restoredCheckpointStats.getCheckpointId(),
+					restoredCheckpointStats.getRestoreTimestamp(),
+					restoredCheckpointStats.getProperties().isSavepoint(),
+					restoredCheckpointStats.getExternalPath());
+			}
+
+			final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
+				completed,
+				savepoint,
+				failed,
+				restored);
+
+			final List<CheckpointStatistics> history = new ArrayList<>(16);
+
+			for (AbstractCheckpointStats abstractCheckpointStats : checkpointStatsSnapshot.getHistory().getCheckpoints()) {
+				history.add(CheckpointStatistics.generateCheckpointStatistics(abstractCheckpointStats, false));
+			}
+
+			return new CheckpointingStatistics(
+				counts,
+				summary,
+				latestCheckpoints,
+				history);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index f50c42d..60b9799 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
deleted file mode 100644
index f21fc76..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
-
-import javax.annotation.Nullable;
-
-/**
- * A size-based cache of accessed checkpoints for completed and failed
- * checkpoints.
- *
- * <p>Having this cache in place for accessed stats improves the user
- * experience quite a bit as accessed checkpoint stats stay available
- * and don't expire. For example if you manage to click on the last
- * checkpoint in the history, it is not available via the stats as soon
- * as another checkpoint is triggered. With the cache in place, the
- * checkpoint will still be available for investigation.
- */
-public class CheckpointStatsCache {
-
-	@Nullable
-	private final Cache<Long, AbstractCheckpointStats> cache;
-
-	public CheckpointStatsCache(int maxNumEntries) {
-		if (maxNumEntries > 0) {
-			this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder()
-				.maximumSize(maxNumEntries)
-				.build();
-		} else {
-			this.cache = null;
-		}
-	}
-
-	/**
-	 * Try to add the checkpoint to the cache.
-	 *
-	 * @param checkpoint Checkpoint to be added.
-	 */
-	void tryAdd(AbstractCheckpointStats checkpoint) {
-		// Don't add in progress checkpoints as they will be replaced by their
-		// completed/failed version eventually.
-		if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
-			cache.put(checkpoint.getCheckpointId(), checkpoint);
-		}
-	}
-
-	/**
-	 * Try to look up a checkpoint by it's ID in the cache.
-	 *
-	 * @param checkpointId ID of the checkpoint to look up.
-	 * @return The checkpoint or <code>null</code> if checkpoint not found.
-	 */
-	AbstractCheckpointStats tryGet(long checkpointId) {
-		if (cache != null) {
-			return cache.getIfPresent(checkpointId);
-		} else {
-			return null;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
index e277971..dce1641 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index 5420cf4..1421fb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
index 5b35c7f..b6c86be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -31,7 +31,8 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
-import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
@@ -129,37 +130,37 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
-		gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_COUNTS);
-		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints());
-		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints());
-		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints());
-		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints());
-		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints());
+		gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_COUNTS);
+		gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints());
+		gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints());
+		gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints());
+		gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints());
+		gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints());
 		gen.writeEndObject();
 	}
 
 	private static void writeSummary(
 		JsonGenerator gen,
 		CompletedCheckpointStatsSummary summary) throws IOException {
-		gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_SUMMARY);
-		gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_STATE_SIZE);
+		gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_SUMMARY);
+		gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_STATE_SIZE);
 		writeMinMaxAvg(gen, summary.getStateSizeStats());
 		gen.writeEndObject();
 
-		gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_DURATION);
+		gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_DURATION);
 		writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
 		gen.writeEndObject();
 
-		gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED);
+		gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED);
 		writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
 		gen.writeEndObject();
 		gen.writeEndObject();
 	}
 
 	static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
-		gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum());
-		gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum());
-		gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage());
+		gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum());
+		gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum());
+		gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage());
 	}
 
 	private static void writeLatestCheckpoints(
@@ -169,10 +170,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 		@Nullable FailedCheckpointStats failed,
 		@Nullable RestoredCheckpointStats restored) throws IOException {
 
-		gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_LATEST_CHECKPOINTS);
+		gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS);
 		// Completed checkpoint
 		if (completed != null) {
-			gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED);
+			gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED);
 			writeCheckpoint(gen, completed);
 
 			String externalPath = completed.getExternalPath();
@@ -185,7 +186,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 		// Completed savepoint
 		if (savepoint != null) {
-			gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT);
+			gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT);
 			writeCheckpoint(gen, savepoint);
 
 			String externalPath = savepoint.getExternalPath();
@@ -197,7 +198,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 		// Failed checkpoint
 		if (failed != null) {
-			gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_FAILED);
+			gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_FAILED);
 			writeCheckpoint(gen, failed);
 
 			gen.writeNumberField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_TIMESTAMP, failed.getFailureTimestamp());
@@ -210,14 +211,14 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 		// Restored checkpoint
 		if (restored != null) {
-			gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_RESTORED);
-			gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId());
-			gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp());
-			gen.writeBooleanField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint());
+			gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_RESTORED);
+			gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId());
+			gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp());
+			gen.writeBooleanField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint());
 
 			String externalPath = restored.getExternalPath();
 			if (externalPath != null) {
-				gen.writeStringField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath);
+				gen.writeStringField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath);
 			}
 			gen.writeEndObject();
 		}
@@ -225,29 +226,29 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
-		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
-		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
-		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
-		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
-		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
-		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
+		gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
+		gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
+		gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
+		gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
+		gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
+		gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
 
 	}
 
 	private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
-		gen.writeArrayFieldStart(CheckpointStatistics.FIELD_NAME_HISTORY);
+		gen.writeArrayFieldStart(CheckpointingStatistics.FIELD_NAME_HISTORY);
 		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
 			gen.writeStartObject();
-			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
-			gen.writeStringField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString());
-			gen.writeBooleanField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint());
-			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
-			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
-			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
-			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
-			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
-			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks());
-			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks());
+			gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
+			gen.writeStringField(CheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString());
+			gen.writeBooleanField(CheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint());
+			gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
+			gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
+			gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
+			gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
+			gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
+			gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks());
+			gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks());
 
 			if (checkpoint.getStatus().isCompleted()) {
 				// --- Completed ---

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
deleted file mode 100644
index bfc0b7a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * Message headers for the {@link CheckpointConfigHandler}.
- */
-public class CheckpointConfigHeaders implements MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
-
-	private static final CheckpointConfigHeaders INSTANCE = new CheckpointConfigHeaders();
-
-	public static final String URL = "/jobs/:jobid/checkpoints/config";
-
-	private CheckpointConfigHeaders() {}
-
-	@Override
-	public Class<EmptyRequestBody> getRequestClass() {
-		return EmptyRequestBody.class;
-	}
-
-	@Override
-	public Class<CheckpointConfigInfo> getResponseClass() {
-		return CheckpointConfigInfo.class;
-	}
-
-	@Override
-	public HttpResponseStatus getResponseStatusCode() {
-		return HttpResponseStatus.OK;
-	}
-
-	@Override
-	public JobMessageParameters getUnresolvedMessageParameters() {
-		return new JobMessageParameters();
-	}
-
-	@Override
-	public HttpMethodWrapper getHttpMethod() {
-		return HttpMethodWrapper.GET;
-	}
-
-	@Override
-	public String getTargetRestEndpointURL() {
-		return URL;
-	}
-
-	public static CheckpointConfigHeaders getInstance() {
-		return INSTANCE;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
deleted file mode 100644
index fbda12a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-/**
- * Response class of the {@link CheckpointConfigHandler}.
- */
-public class CheckpointConfigInfo implements ResponseBody {
-
-	public static final String FIELD_NAME_PROCESSING_MODE = "mode";
-
-	public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
-
-	public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
-
-	public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause";
-
-	public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent";
-
-	public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization";
-
-	@JsonProperty(FIELD_NAME_PROCESSING_MODE)
-	private final ProcessingMode processingMode;
-
-	@JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL)
-	private final long checkpointInterval;
-
-	@JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT)
-	private final long checkpointTimeout;
-
-	@JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE)
-	private final long minPauseBetweenCheckpoints;
-
-	@JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT)
-	private final long maxConcurrentCheckpoints;
-
-	@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG)
-	private final ExternalizedCheckpointInfo externalizedCheckpointInfo;
-
-	@JsonCreator
-	public CheckpointConfigInfo(
-			@JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode,
-			@JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long checkpointInterval,
-			@JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long checkpointTimeout,
-			@JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long minPauseBetweenCheckpoints,
-			@JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int maxConcurrentCheckpoints,
-			@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo) {
-		this.processingMode = Preconditions.checkNotNull(processingMode);
-		this.checkpointInterval = checkpointInterval;
-		this.checkpointTimeout = checkpointTimeout;
-		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
-		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
-		this.externalizedCheckpointInfo = Preconditions.checkNotNull(externalizedCheckpointInfo);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-		CheckpointConfigInfo that = (CheckpointConfigInfo) o;
-		return checkpointInterval == that.checkpointInterval &&
-			checkpointTimeout == that.checkpointTimeout &&
-			minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
-			maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
-			processingMode == that.processingMode &&
-			Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo);
-	}
-
-	@Override
-	public int hashCode() {
-		return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointInfo);
-	}
-
-	/**
-	 * Contains information about the externalized checkpoint configuration.
-	 */
-	public static final class ExternalizedCheckpointInfo {
-
-		public static final String FIELD_NAME_ENABLED = "enabled";
-
-		public static final String FIELD_NAME_DELETE_ON_CANCELLATION = "delete_on_cancellation";
-
-		@JsonProperty(FIELD_NAME_ENABLED)
-		private final boolean enabled;
-
-		@JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION)
-		private final boolean deleteOnCancellation;
-
-		@JsonCreator
-		public ExternalizedCheckpointInfo(
-				@JsonProperty(FIELD_NAME_ENABLED) boolean enabled,
-				@JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) {
-			this.enabled = enabled;
-			this.deleteOnCancellation = deleteOnCancellation;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			ExternalizedCheckpointInfo that = (ExternalizedCheckpointInfo) o;
-			return enabled == that.enabled &&
-				deleteOnCancellation == that.deleteOnCancellation;
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(enabled, deleteOnCancellation);
-		}
-	}
-
-	/**
-	 * Processing mode.
-	 */
-	public enum ProcessingMode {
-		AT_LEAST_ONCE,
-		EXACTLY_ONCE
-	}
-}


[2/3] flink git commit: [FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
deleted file mode 100644
index ade8c7a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
+++ /dev/null
@@ -1,763 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Response of the {@link CheckpointStatisticsHandler}.
- */
-public class CheckpointStatistics implements ResponseBody {
-
-	public static final String FIELD_NAME_COUNTS = "counts";
-
-	public static final String FIELD_NAME_SUMMARY = "summary";
-
-	public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest";
-
-	public static final String FIELD_NAME_HISTORY = "history";
-
-	@JsonProperty(FIELD_NAME_COUNTS)
-	private final Counts counts;
-
-	@JsonProperty(FIELD_NAME_SUMMARY)
-	private final Summary summary;
-
-	@JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
-	private final LatestCheckpoints latestCheckpoints;
-
-	@JsonProperty(FIELD_NAME_HISTORY)
-	private final List<BaseCheckpointStatistics> history;
-
-	@JsonCreator
-	public CheckpointStatistics(
-			@JsonProperty(FIELD_NAME_COUNTS) Counts counts,
-			@JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
-			@JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints,
-			@JsonProperty(FIELD_NAME_HISTORY) List<BaseCheckpointStatistics> history) {
-		this.counts = Preconditions.checkNotNull(counts);
-		this.summary = Preconditions.checkNotNull(summary);
-		this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints);
-		this.history = Preconditions.checkNotNull(history);
-	}
-
-	public Counts getCounts() {
-		return counts;
-	}
-
-	public Summary getSummary() {
-		return summary;
-	}
-
-	public LatestCheckpoints getLatestCheckpoints() {
-		return latestCheckpoints;
-	}
-
-	public List<BaseCheckpointStatistics> getHistory() {
-		return history;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-		CheckpointStatistics that = (CheckpointStatistics) o;
-		return Objects.equals(counts, that.counts) &&
-			Objects.equals(summary, that.summary) &&
-			Objects.equals(latestCheckpoints, that.latestCheckpoints) &&
-			Objects.equals(history, that.history);
-	}
-
-	@Override
-	public int hashCode() {
-		return Objects.hash(counts, summary, latestCheckpoints, history);
-	}
-
-	// ------------------------------------------------------------------
-	// Inner classes
-	// ------------------------------------------------------------------
-
-	/**
-	 * Checkpoint counts.
-	 */
-	public static final class Counts {
-
-		public static final String FIELD_NAME_RESTORED_CHECKPOINTS = "restored";
-
-		public static final String FIELD_NAME_TOTAL_CHECKPOINTS = "total";
-
-		public static final String FIELD_NAME_IN_PROGRESS_CHECKPOINTS = "in_progress";
-
-		public static final String FIELD_NAME_COMPLETED_CHECKPOINTS = "completed";
-
-		public static final String FIELD_NAME_FAILED_CHECKPOINTS = "failed";
-
-		@JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS)
-		private final long numberRestoredCheckpoints;
-
-		@JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS)
-		private final long totalNumberCheckpoints;
-
-		@JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS)
-		private final int numberInProgressCheckpoints;
-
-		@JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS)
-		private final long numberCompletedCheckpoints;
-
-		@JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS)
-		private final long numberFailedCheckpoints;
-
-		@JsonCreator
-		public Counts(
-				@JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) long numberRestoredCheckpoints,
-				@JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) long totalNumberCheckpoints,
-				@JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) int numberInProgressCheckpoints,
-				@JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) long numberCompletedCheckpoints,
-				@JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) long numberFailedCheckpoints) {
-			this.numberRestoredCheckpoints = numberRestoredCheckpoints;
-			this.totalNumberCheckpoints = totalNumberCheckpoints;
-			this.numberInProgressCheckpoints = numberInProgressCheckpoints;
-			this.numberCompletedCheckpoints = numberCompletedCheckpoints;
-			this.numberFailedCheckpoints = numberFailedCheckpoints;
-		}
-
-		public long getNumberRestoredCheckpoints() {
-			return numberRestoredCheckpoints;
-		}
-
-		public long getTotalNumberCheckpoints() {
-			return totalNumberCheckpoints;
-		}
-
-		public int getNumberInProgressCheckpoints() {
-			return numberInProgressCheckpoints;
-		}
-
-		public long getNumberCompletedCheckpoints() {
-			return numberCompletedCheckpoints;
-		}
-
-		public long getNumberFailedCheckpoints() {
-			return numberFailedCheckpoints;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			Counts counts = (Counts) o;
-			return numberRestoredCheckpoints == counts.numberRestoredCheckpoints &&
-				totalNumberCheckpoints == counts.totalNumberCheckpoints &&
-				numberInProgressCheckpoints == counts.numberInProgressCheckpoints &&
-				numberCompletedCheckpoints == counts.numberCompletedCheckpoints &&
-				numberFailedCheckpoints == counts.numberFailedCheckpoints;
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(numberRestoredCheckpoints, totalNumberCheckpoints, numberInProgressCheckpoints, numberCompletedCheckpoints, numberFailedCheckpoints);
-		}
-	}
-
-	/**
-	 * Checkpoint summary.
-	 */
-	public static final class Summary {
-
-		public static final String FIELD_NAME_STATE_SIZE = "state_size";
-
-		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
-
-		public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
-
-		@JsonProperty(FIELD_NAME_STATE_SIZE)
-		private final MinMaxAvgStatistics stateSize;
-
-		@JsonProperty(FIELD_NAME_DURATION)
-		private final MinMaxAvgStatistics duration;
-
-		@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
-		private final MinMaxAvgStatistics alignmentBuffered;
-
-		@JsonCreator
-		public Summary(
-				@JsonProperty(FIELD_NAME_STATE_SIZE) MinMaxAvgStatistics stateSize,
-				@JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration,
-				@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) MinMaxAvgStatistics alignmentBuffered) {
-			this.stateSize = stateSize;
-			this.duration = duration;
-			this.alignmentBuffered = alignmentBuffered;
-		}
-
-		public MinMaxAvgStatistics getStateSize() {
-			return stateSize;
-		}
-
-		public MinMaxAvgStatistics getDuration() {
-			return duration;
-		}
-
-		public MinMaxAvgStatistics getAlignmentBuffered() {
-			return alignmentBuffered;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			Summary summary = (Summary) o;
-			return Objects.equals(stateSize, summary.stateSize) &&
-				Objects.equals(duration, summary.duration) &&
-				Objects.equals(alignmentBuffered, summary.alignmentBuffered);
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(stateSize, duration, alignmentBuffered);
-		}
-	}
-
-	/**
-	 * Minimum, maximum and average statistics.
-	 */
-	public static final class MinMaxAvgStatistics {
-
-		public static final String FIELD_NAME_MINIMUM = "min";
-
-		public static final String FIELD_NAME_MAXIMUM = "max";
-
-		public static final String FIELD_NAME_AVERAGE = "avg";
-
-		@JsonProperty(FIELD_NAME_MINIMUM)
-		private final long minimum;
-
-		@JsonProperty(FIELD_NAME_MAXIMUM)
-		private final long maximum;
-
-		@JsonProperty(FIELD_NAME_AVERAGE)
-		private final long average;
-
-		@JsonCreator
-		public MinMaxAvgStatistics(
-				@JsonProperty(FIELD_NAME_MINIMUM) long minimum,
-				@JsonProperty(FIELD_NAME_MAXIMUM) long maximum,
-				@JsonProperty(FIELD_NAME_AVERAGE) long average) {
-			this.minimum = minimum;
-			this.maximum = maximum;
-			this.average = average;
-		}
-
-		public long getMinimum() {
-			return minimum;
-		}
-
-		public long getMaximum() {
-			return maximum;
-		}
-
-		public long getAverage() {
-			return average;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			MinMaxAvgStatistics that = (MinMaxAvgStatistics) o;
-			return minimum == that.minimum &&
-				maximum == that.maximum &&
-				average == that.average;
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(minimum, maximum, average);
-		}
-	}
-
-	/**
-	 * Statistics about the latest checkpoints.
-	 */
-	public static final class LatestCheckpoints {
-
-		public static final String FIELD_NAME_COMPLETED = "completed";
-
-		public static final String FIELD_NAME_SAVEPOINT = "savepoint";
-
-		public static final String FIELD_NAME_FAILED = "failed";
-
-		public static final String FIELD_NAME_RESTORED = "restored";
-
-		@JsonProperty(FIELD_NAME_COMPLETED)
-		@Nullable
-		private final CompletedCheckpointStatistics completedCheckpointStatistics;
-
-		@JsonProperty(FIELD_NAME_SAVEPOINT)
-		@Nullable
-		private final CompletedCheckpointStatistics savepointStatistics;
-
-		@JsonProperty(FIELD_NAME_FAILED)
-		@Nullable
-		private final FailedCheckpointStatistics failedCheckpointStatistics;
-
-		@JsonProperty(FIELD_NAME_RESTORED)
-		@Nullable
-		private final RestoredCheckpointStatistics restoredCheckpointStatistics;
-
-		@JsonCreator
-		public LatestCheckpoints(
-				@JsonProperty(FIELD_NAME_COMPLETED) @Nullable CompletedCheckpointStatistics completedCheckpointStatistics,
-				@JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CompletedCheckpointStatistics savepointStatistics,
-				@JsonProperty(FIELD_NAME_FAILED) @Nullable FailedCheckpointStatistics failedCheckpointStatistics,
-				@JsonProperty(FIELD_NAME_RESTORED) @Nullable RestoredCheckpointStatistics restoredCheckpointStatistics) {
-			this.completedCheckpointStatistics = completedCheckpointStatistics;
-			this.savepointStatistics = savepointStatistics;
-			this.failedCheckpointStatistics = failedCheckpointStatistics;
-			this.restoredCheckpointStatistics = restoredCheckpointStatistics;
-		}
-
-		@Nullable
-		public CompletedCheckpointStatistics getCompletedCheckpointStatistics() {
-			return completedCheckpointStatistics;
-		}
-
-		@Nullable
-		public CompletedCheckpointStatistics getSavepointStatistics() {
-			return savepointStatistics;
-		}
-
-		@Nullable
-		public FailedCheckpointStatistics getFailedCheckpointStatistics() {
-			return failedCheckpointStatistics;
-		}
-
-		@Nullable
-		public RestoredCheckpointStatistics getRestoredCheckpointStatistics() {
-			return restoredCheckpointStatistics;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			LatestCheckpoints that = (LatestCheckpoints) o;
-			return Objects.equals(completedCheckpointStatistics, that.completedCheckpointStatistics) &&
-				Objects.equals(savepointStatistics, that.savepointStatistics) &&
-				Objects.equals(failedCheckpointStatistics, that.failedCheckpointStatistics) &&
-				Objects.equals(restoredCheckpointStatistics, that.restoredCheckpointStatistics);
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(completedCheckpointStatistics, savepointStatistics, failedCheckpointStatistics, restoredCheckpointStatistics);
-		}
-	}
-
-	/**
-	 * Statistics for a checkpoint.
-	 */
-	@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
-	@JsonSubTypes({
-		@JsonSubTypes.Type(value = CompletedCheckpointStatistics.class, name = "completed"),
-		@JsonSubTypes.Type(value = FailedCheckpointStatistics.class, name = "failed")})
-	public static class BaseCheckpointStatistics {
-
-		public static final String FIELD_NAME_ID = "id";
-
-		public static final String FIELD_NAME_STATUS = "status";
-
-		public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
-
-		public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
-
-		public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
-
-		public static final String FIELD_NAME_STATE_SIZE = "state_size";
-
-		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
-
-		public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
-
-		public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
-
-		public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
-
-		@JsonProperty(FIELD_NAME_ID)
-		private final long id;
-
-		@JsonProperty(FIELD_NAME_STATUS)
-		private final CheckpointStatsStatus status;
-
-		@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
-		private final boolean savepoint;
-
-		@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
-		private final long triggerTimestamp;
-
-		@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
-		private final long latestAckTimestamp;
-
-		@JsonProperty(FIELD_NAME_STATE_SIZE)
-		private final long stateSize;
-
-		@JsonProperty(FIELD_NAME_DURATION)
-		private final long duration;
-
-		@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
-		private final long alignmentBuffered;
-
-		@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
-		private final int numSubtasks;
-
-		@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
-		private final int numAckSubtasks;
-
-		@JsonCreator
-		protected BaseCheckpointStatistics(
-				@JsonProperty(FIELD_NAME_ID) long id,
-				@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
-				@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
-				@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
-				@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
-				@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
-				@JsonProperty(FIELD_NAME_DURATION) long duration,
-				@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
-				@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
-				@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
-			this.id = id;
-			this.status = Preconditions.checkNotNull(status);
-			this.savepoint = savepoint;
-			this.triggerTimestamp = triggerTimestamp;
-			this.latestAckTimestamp = latestAckTimestamp;
-			this.stateSize = stateSize;
-			this.duration = duration;
-			this.alignmentBuffered = alignmentBuffered;
-			this.numSubtasks = numSubtasks;
-			this.numAckSubtasks = numAckSubtasks;
-		}
-
-		public long getId() {
-			return id;
-		}
-
-		public CheckpointStatsStatus getStatus() {
-			return status;
-		}
-
-		public boolean isSavepoint() {
-			return savepoint;
-		}
-
-		public long getTriggerTimestamp() {
-			return triggerTimestamp;
-		}
-
-		public long getLatestAckTimestamp() {
-			return latestAckTimestamp;
-		}
-
-		public long getStateSize() {
-			return stateSize;
-		}
-
-		public long getDuration() {
-			return duration;
-		}
-
-		public long getAlignmentBuffered() {
-			return alignmentBuffered;
-		}
-
-		public int getNumSubtasks() {
-			return numSubtasks;
-		}
-
-		public int getNumAckSubtasks() {
-			return numAckSubtasks;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			BaseCheckpointStatistics that = (BaseCheckpointStatistics) o;
-			return id == that.id &&
-				savepoint == that.savepoint &&
-				triggerTimestamp == that.triggerTimestamp &&
-				latestAckTimestamp == that.latestAckTimestamp &&
-				stateSize == that.stateSize &&
-				duration == that.duration &&
-				alignmentBuffered == that.alignmentBuffered &&
-				numSubtasks == that.numSubtasks &&
-				numAckSubtasks == that.numAckSubtasks &&
-				status == that.status;
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
-		}
-	}
-
-	/**
-	 * Statistics for a completed checkpoint.
-	 */
-	public static final class CompletedCheckpointStatistics extends BaseCheckpointStatistics {
-
-		public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
-
-		public static final String FIELD_NAME_DISCARDED = "discarded";
-
-		@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
-		@Nullable
-		private final String externalPath;
-
-		@JsonProperty(FIELD_NAME_DISCARDED)
-		private final boolean discarded;
-
-		@JsonCreator
-		public CompletedCheckpointStatistics(
-				@JsonProperty(FIELD_NAME_ID) long id,
-				@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
-				@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
-				@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
-				@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
-				@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
-				@JsonProperty(FIELD_NAME_DURATION) long duration,
-				@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
-				@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
-				@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
-				@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
-				@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
-			super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
-
-			this.externalPath = externalPath;
-			this.discarded = discarded;
-		}
-
-		@Nullable
-		public String getExternalPath() {
-			return externalPath;
-		}
-
-		public boolean isDiscarded() {
-			return discarded;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			if (!super.equals(o)) {
-				return false;
-			}
-			CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o;
-			return discarded == that.discarded &&
-				Objects.equals(externalPath, that.externalPath);
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(super.hashCode(), externalPath, discarded);
-		}
-	}
-
-	/**
-	 * Statistics for a failed checkpoint.
-	 */
-	public static final class FailedCheckpointStatistics extends BaseCheckpointStatistics {
-
-		public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp";
-
-		public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message";
-
-		@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
-		private final long failureTimestamp;
-
-		@JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
-		@Nullable
-		private final String failureMessage;
-
-		@JsonCreator
-		public FailedCheckpointStatistics(
-				@JsonProperty(FIELD_NAME_ID) long id,
-				@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
-				@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
-				@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
-				@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
-				@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
-				@JsonProperty(FIELD_NAME_DURATION) long duration,
-				@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
-				@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
-				@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
-				@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
-				@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
-			super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
-
-			this.failureTimestamp = failureTimestamp;
-			this.failureMessage = failureMessage;
-		}
-
-		public long getFailureTimestamp() {
-			return failureTimestamp;
-		}
-
-		@Nullable
-		public String getFailureMessage() {
-			return failureMessage;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			if (!super.equals(o)) {
-				return false;
-			}
-			FailedCheckpointStatistics that = (FailedCheckpointStatistics) o;
-			return failureTimestamp == that.failureTimestamp &&
-				Objects.equals(failureMessage, that.failureMessage);
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(super.hashCode(), failureTimestamp, failureMessage);
-		}
-	}
-
-	/**
-	 * Statistics for a restored checkpoint.
-	 */
-	public static final class RestoredCheckpointStatistics {
-
-		public static final String FIELD_NAME_ID = "id";
-
-		public static final String FIELD_NAME_RESTORE_TIMESTAMP = "restore_timestamp";
-
-		public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
-
-		public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
-
-		@JsonProperty(FIELD_NAME_ID)
-		private final long id;
-
-		@JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP)
-		private final long restoreTimestamp;
-
-		@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
-		private final boolean savepoint;
-
-		@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
-		@Nullable
-		private final String externalPath;
-
-		@JsonCreator
-		public RestoredCheckpointStatistics(
-				@JsonProperty(FIELD_NAME_ID) long id,
-				@JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) long restoreTimestamp,
-				@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
-				@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath) {
-			this.id = id;
-			this.restoreTimestamp = restoreTimestamp;
-			this.savepoint = savepoint;
-			this.externalPath = externalPath;
-		}
-
-		public long getId() {
-			return id;
-		}
-
-		public long getRestoreTimestamp() {
-			return restoreTimestamp;
-		}
-
-		public boolean isSavepoint() {
-			return savepoint;
-		}
-
-		@Nullable
-		public String getExternalPath() {
-			return externalPath;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			RestoredCheckpointStatistics that = (RestoredCheckpointStatistics) o;
-			return id == that.id &&
-				restoreTimestamp == that.restoreTimestamp &&
-				savepoint == that.savepoint &&
-				Objects.equals(externalPath, that.externalPath);
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(id, restoreTimestamp, savepoint, externalPath);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
deleted file mode 100644
index b062d0d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * Message headers for the {@link CheckpointStatisticsHandler}.
- */
-public class CheckpointStatisticsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> {
-
-	private static final CheckpointStatisticsHeaders INSTANCE = new CheckpointStatisticsHeaders();
-
-	public static final String URL = "/jobs/:jobid/checkpoints";
-
-	@Override
-	public Class<EmptyRequestBody> getRequestClass() {
-		return EmptyRequestBody.class;
-	}
-
-	@Override
-	public Class<CheckpointStatistics> getResponseClass() {
-		return CheckpointStatistics.class;
-	}
-
-	@Override
-	public HttpResponseStatus getResponseStatusCode() {
-		return HttpResponseStatus.OK;
-	}
-
-	@Override
-	public JobMessageParameters getUnresolvedMessageParameters() {
-		return new JobMessageParameters();
-	}
-
-	@Override
-	public HttpMethodWrapper getHttpMethod() {
-		return HttpMethodWrapper.GET;
-	}
-
-	@Override
-	public String getTargetRestEndpointURL() {
-		return URL;
-	}
-
-	public static CheckpointStatisticsHeaders getInstance() {
-		return INSTANCE;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
index 9d74c95..1155892 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
@@ -26,7 +26,7 @@ import java.util.Collections;
  */
 public class JobMessageParameters extends MessageParameters {
 
-	private final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
+	protected final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
 
 	@Override
 	public Collection<MessagePathParameter<?>> getPathParameters() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
new file mode 100644
index 0000000..f0526a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigHeaders implements MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
+
+	private static final CheckpointConfigHeaders INSTANCE = new CheckpointConfigHeaders();
+
+	public static final String URL = "/jobs/:jobid/checkpoints/config";
+
+	private CheckpointConfigHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<CheckpointConfigInfo> getResponseClass() {
+		return CheckpointConfigInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static CheckpointConfigHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
new file mode 100644
index 0000000..797d3a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Response class of the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_PROCESSING_MODE = "mode";
+
+	public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
+
+	public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
+
+	public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause";
+
+	public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent";
+
+	public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization";
+
+	@JsonProperty(FIELD_NAME_PROCESSING_MODE)
+	private final ProcessingMode processingMode;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL)
+	private final long checkpointInterval;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT)
+	private final long checkpointTimeout;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE)
+	private final long minPauseBetweenCheckpoints;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT)
+	private final long maxConcurrentCheckpoints;
+
+	@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG)
+	private final ExternalizedCheckpointInfo externalizedCheckpointInfo;
+
+	@JsonCreator
+	public CheckpointConfigInfo(
+			@JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long checkpointInterval,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long checkpointTimeout,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long minPauseBetweenCheckpoints,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int maxConcurrentCheckpoints,
+			@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo) {
+		this.processingMode = Preconditions.checkNotNull(processingMode);
+		this.checkpointInterval = checkpointInterval;
+		this.checkpointTimeout = checkpointTimeout;
+		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+		this.externalizedCheckpointInfo = Preconditions.checkNotNull(externalizedCheckpointInfo);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		CheckpointConfigInfo that = (CheckpointConfigInfo) o;
+		return checkpointInterval == that.checkpointInterval &&
+			checkpointTimeout == that.checkpointTimeout &&
+			minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
+			maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
+			processingMode == that.processingMode &&
+			Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointInfo);
+	}
+
+	/**
+	 * Contains information about the externalized checkpoint configuration.
+	 */
+	public static final class ExternalizedCheckpointInfo {
+
+		public static final String FIELD_NAME_ENABLED = "enabled";
+
+		public static final String FIELD_NAME_DELETE_ON_CANCELLATION = "delete_on_cancellation";
+
+		@JsonProperty(FIELD_NAME_ENABLED)
+		private final boolean enabled;
+
+		@JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION)
+		private final boolean deleteOnCancellation;
+
+		@JsonCreator
+		public ExternalizedCheckpointInfo(
+				@JsonProperty(FIELD_NAME_ENABLED) boolean enabled,
+				@JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) {
+			this.enabled = enabled;
+			this.deleteOnCancellation = deleteOnCancellation;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ExternalizedCheckpointInfo that = (ExternalizedCheckpointInfo) o;
+			return enabled == that.enabled &&
+				deleteOnCancellation == that.deleteOnCancellation;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(enabled, deleteOnCancellation);
+		}
+	}
+
+	/**
+	 * Processing mode.
+	 */
+	public enum ProcessingMode {
+		AT_LEAST_ONCE,
+		EXACTLY_ONCE
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
new file mode 100644
index 0000000..c08cc82
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+/**
+ * Path parameter for the checkpoint id of type {@link Long}.
+ */
+public class CheckpointIdPathParameter extends MessagePathParameter<Long> {
+
+	public static final String KEY = "checkpointid";
+
+	protected CheckpointIdPathParameter() {
+		super(KEY);
+	}
+
+	@Override
+	protected Long convertFromString(String value) throws ConversionException {
+		try {
+			return Long.parseLong(value);
+		} catch (NumberFormatException nfe) {
+			throw new ConversionException("Could not parse long from " + value + '.', nfe);
+		}
+	}
+
+	@Override
+	protected String convertToString(Long value) {
+		return value.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
new file mode 100644
index 0000000..040aa87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Message parameters for checkpoint related messages.
+ */
+public class CheckpointMessageParameters extends JobMessageParameters {
+
+	protected final CheckpointIdPathParameter checkpointIdPathParameter = new CheckpointIdPathParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Arrays.asList(jobPathParameter, checkpointIdPathParameter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
new file mode 100644
index 0000000..3d7ba2b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Headers for the {@link CheckpointStatisticDetailsHandler}.
+ */
+public class CheckpointStatisticDetailsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> {
+
+	private static final CheckpointStatisticDetailsHeaders INSTANCE = new CheckpointStatisticDetailsHeaders();
+
+	public static final String URL = "/jobs/:jobid/checkpoints/:checkpointid";
+
+	private CheckpointStatisticDetailsHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<CheckpointStatistics> getResponseClass() {
+		return CheckpointStatistics.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public CheckpointMessageParameters getUnresolvedMessageParameters() {
+		return new CheckpointMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static CheckpointStatisticDetailsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
new file mode 100644
index 0000000..9fb1094
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+public class CheckpointStatistics implements ResponseBody {
+
+	public static final String FIELD_NAME_ID = "id";
+
+	public static final String FIELD_NAME_STATUS = "status";
+
+	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
+
+	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
+
+	public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
+
+	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
+
+	public static final String FIELD_NAME_TASKS = "tasks";
+
+	@JsonProperty(FIELD_NAME_ID)
+	private final long id;
+
+	@JsonProperty(FIELD_NAME_STATUS)
+	private final CheckpointStatsStatus status;
+
+	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+	private final boolean savepoint;
+
+	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+	private final long triggerTimestamp;
+
+	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+	private final long latestAckTimestamp;
+
+	@JsonProperty(FIELD_NAME_STATE_SIZE)
+	private final long stateSize;
+
+	@JsonProperty(FIELD_NAME_DURATION)
+	private final long duration;
+
+	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+	private final long alignmentBuffered;
+
+	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+	private final int numSubtasks;
+
+	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+	private final int numAckSubtasks;
+
+	@JsonProperty(FIELD_NAME_TASKS)
+	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
+	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
+
+	@JsonCreator
+	private CheckpointStatistics(
+			@JsonProperty(FIELD_NAME_ID) long id,
+			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
+			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+			@JsonProperty(FIELD_NAME_DURATION) long duration,
+			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+		this.id = id;
+		this.status = Preconditions.checkNotNull(status);
+		this.savepoint = savepoint;
+		this.triggerTimestamp = triggerTimestamp;
+		this.latestAckTimestamp = latestAckTimestamp;
+		this.stateSize = stateSize;
+		this.duration = duration;
+		this.alignmentBuffered = alignmentBuffered;
+		this.numSubtasks = numSubtasks;
+		this.numAckSubtasks = numAckSubtasks;
+		this.checkpointStatisticsPerTask = Preconditions.checkNotNull(checkpointStatisticsPerTask);
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public CheckpointStatsStatus getStatus() {
+		return status;
+	}
+
+	public boolean isSavepoint() {
+		return savepoint;
+	}
+
+	public long getTriggerTimestamp() {
+		return triggerTimestamp;
+	}
+
+	public long getLatestAckTimestamp() {
+		return latestAckTimestamp;
+	}
+
+	public long getStateSize() {
+		return stateSize;
+	}
+
+	public long getDuration() {
+		return duration;
+	}
+
+	public long getAlignmentBuffered() {
+		return alignmentBuffered;
+	}
+
+	public int getNumSubtasks() {
+		return numSubtasks;
+	}
+
+	public int getNumAckSubtasks() {
+		return numAckSubtasks;
+	}
+
+	@Nullable
+	public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() {
+		return checkpointStatisticsPerTask;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		CheckpointStatistics that = (CheckpointStatistics) o;
+		return id == that.id &&
+			savepoint == that.savepoint &&
+			triggerTimestamp == that.triggerTimestamp &&
+			latestAckTimestamp == that.latestAckTimestamp &&
+			stateSize == that.stateSize &&
+			duration == that.duration &&
+			alignmentBuffered == that.alignmentBuffered &&
+			numSubtasks == that.numSubtasks &&
+			numAckSubtasks == that.numAckSubtasks &&
+			status == that.status &&
+			Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask);
+	}
+
+	// -------------------------------------------------------------------------
+	// Static factory methods
+	// -------------------------------------------------------------------------
+
+	public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) {
+		Preconditions.checkNotNull(checkpointStats);
+
+		Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
+
+		if (includeTaskCheckpointStatistics) {
+			Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats();
+
+			checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size());
+
+			for (TaskStateStats taskStateStat : taskStateStats) {
+				checkpointStatisticsPerTask.put(
+					taskStateStat.getJobVertexId(),
+					new TaskCheckpointStatistics(
+						taskStateStat.getLatestAckTimestamp(),
+						taskStateStat.getStateSize(),
+						taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
+						taskStateStat.getAlignmentBuffered(),
+						taskStateStat.getNumberOfSubtasks(),
+						taskStateStat.getNumberOfAcknowledgedSubtasks()));
+			}
+		} else {
+			checkpointStatisticsPerTask = Collections.emptyMap();
+		}
+
+		if (checkpointStats instanceof CompletedCheckpointStats) {
+			final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
+
+			return new CheckpointStatistics.CompletedCheckpointStatistics(
+				completedCheckpointStats.getCheckpointId(),
+				completedCheckpointStats.getStatus(),
+				completedCheckpointStats.getProperties().isSavepoint(),
+				completedCheckpointStats.getTriggerTimestamp(),
+				completedCheckpointStats.getLatestAckTimestamp(),
+				completedCheckpointStats.getStateSize(),
+				completedCheckpointStats.getEndToEndDuration(),
+				completedCheckpointStats.getAlignmentBuffered(),
+				completedCheckpointStats.getNumberOfSubtasks(),
+				completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+				checkpointStatisticsPerTask,
+				completedCheckpointStats.getExternalPath(),
+				completedCheckpointStats.isDiscarded());
+		} else if (checkpointStats instanceof FailedCheckpointStats) {
+			final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
+
+			return new CheckpointStatistics.FailedCheckpointStatistics(
+				failedCheckpointStats.getCheckpointId(),
+				failedCheckpointStats.getStatus(),
+				failedCheckpointStats.getProperties().isSavepoint(),
+				failedCheckpointStats.getTriggerTimestamp(),
+				failedCheckpointStats.getLatestAckTimestamp(),
+				failedCheckpointStats.getStateSize(),
+				failedCheckpointStats.getEndToEndDuration(),
+				failedCheckpointStats.getAlignmentBuffered(),
+				failedCheckpointStats.getNumberOfSubtasks(),
+				failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+				checkpointStatisticsPerTask,
+				failedCheckpointStats.getFailureTimestamp(),
+				failedCheckpointStats.getFailureMessage());
+		} else {
+			throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
+		}
+	}
+
+	// ---------------------------------------------------------------------
+	// Static inner classes
+	// ---------------------------------------------------------------------
+
+	/**
+	 * Checkpoint statistics for a single task.
+	 */
+	public static final class TaskCheckpointStatistics {
+
+		public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
+
+		public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+		public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
+
+		public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+		public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
+
+		@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+		private final long latestAckTimestamp;
+
+		@JsonProperty(FIELD_NAME_STATE_SIZE)
+		private final long stateSize;
+
+		@JsonProperty(FIELD_NAME_DURATION)
+		private final long duration;
+
+		@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+		private final long alignmentBuffered;
+
+		@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+		private final int numSubtasks;
+
+		@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+		private final int numAckSubtasks;
+
+		@JsonCreator
+		public TaskCheckpointStatistics(
+			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+			@JsonProperty(FIELD_NAME_DURATION) long duration,
+			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
+			this.latestAckTimestamp = latestAckTimestamp;
+			this.stateSize = stateSize;
+			this.duration = duration;
+			this.alignmentBuffered = alignmentBuffered;
+			this.numSubtasks = numSubtasks;
+			this.numAckSubtasks = numAckSubtasks;
+		}
+
+		public long getLatestAckTimestamp() {
+			return latestAckTimestamp;
+		}
+
+		public long getStateSize() {
+			return stateSize;
+		}
+
+		public long getDuration() {
+			return duration;
+		}
+
+		public long getAlignmentBuffered() {
+			return alignmentBuffered;
+		}
+
+		public int getNumSubtasks() {
+			return numSubtasks;
+		}
+
+		public int getNumAckSubtasks() {
+			return numAckSubtasks;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			TaskCheckpointStatistics that = (TaskCheckpointStatistics) o;
+			return latestAckTimestamp == that.latestAckTimestamp &&
+				stateSize == that.stateSize &&
+				duration == that.duration &&
+				alignmentBuffered == that.alignmentBuffered &&
+				numSubtasks == that.numSubtasks &&
+				numAckSubtasks == that.numAckSubtasks;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
+		}
+	}
+
+	/**
+	 * Statistics for a completed checkpoint.
+	 */
+	public static final class CompletedCheckpointStatistics extends CheckpointStatistics {
+
+		public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
+
+		public static final String FIELD_NAME_DISCARDED = "discarded";
+
+		@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
+		@Nullable
+		private final String externalPath;
+
+		@JsonProperty(FIELD_NAME_DISCARDED)
+		private final boolean discarded;
+
+		@JsonCreator
+		public CompletedCheckpointStatistics(
+			@JsonProperty(FIELD_NAME_ID) long id,
+			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
+			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+			@JsonProperty(FIELD_NAME_DURATION) long duration,
+			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+			@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
+			@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
+			super(
+				id,
+				status,
+				savepoint,
+				triggerTimestamp,
+				latestAckTimestamp,
+				stateSize,
+				duration,
+				alignmentBuffered,
+				numSubtasks,
+				numAckSubtasks,
+				checkpointingStatisticsPerTask);
+
+			this.externalPath = externalPath;
+			this.discarded = discarded;
+		}
+
+		@Nullable
+		public String getExternalPath() {
+			return externalPath;
+		}
+
+		public boolean isDiscarded() {
+			return discarded;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			if (!super.equals(o)) {
+				return false;
+			}
+			CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o;
+			return discarded == that.discarded &&
+				Objects.equals(externalPath, that.externalPath);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(super.hashCode(), externalPath, discarded);
+		}
+	}
+
+	/**
+	 * Statistics for a failed checkpoint.
+	 */
+	public static final class FailedCheckpointStatistics extends CheckpointStatistics {
+
+		public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp";
+
+		public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message";
+
+		@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
+		private final long failureTimestamp;
+
+		@JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
+		@Nullable
+		private final String failureMessage;
+
+		@JsonCreator
+		public FailedCheckpointStatistics(
+			@JsonProperty(FIELD_NAME_ID) long id,
+			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
+			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+			@JsonProperty(FIELD_NAME_DURATION) long duration,
+			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+			@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
+			@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
+			super(
+				id,
+				status,
+				savepoint,
+				triggerTimestamp,
+				latestAckTimestamp,
+				stateSize,
+				duration,
+				alignmentBuffered,
+				numSubtasks,
+				numAckSubtasks,
+				checkpointingStatisticsPerTask);
+
+			this.failureTimestamp = failureTimestamp;
+			this.failureMessage = failureMessage;
+		}
+
+		public long getFailureTimestamp() {
+			return failureTimestamp;
+		}
+
+		@Nullable
+		public String getFailureMessage() {
+			return failureMessage;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			if (!super.equals(o)) {
+				return false;
+			}
+			FailedCheckpointStatistics that = (FailedCheckpointStatistics) o;
+			return failureTimestamp == that.failureTimestamp &&
+				Objects.equals(failureMessage, that.failureMessage);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(super.hashCode(), failureTimestamp, failureMessage);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
new file mode 100644
index 0000000..1f00fcc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response of the {@link CheckpointingStatisticsHandler}. This class contains information about
+ * the checkpointing of a given job.
+ */
+public class CheckpointingStatistics implements ResponseBody {
+
+	public static final String FIELD_NAME_COUNTS = "counts";
+
+	public static final String FIELD_NAME_SUMMARY = "summary";
+
+	public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest";
+
+	public static final String FIELD_NAME_HISTORY = "history";
+
+	@JsonProperty(FIELD_NAME_COUNTS)
+	private final Counts counts;
+
+	@JsonProperty(FIELD_NAME_SUMMARY)
+	private final Summary summary;
+
+	@JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
+	private final LatestCheckpoints latestCheckpoints;
+
+	@JsonProperty(FIELD_NAME_HISTORY)
+	private final List<CheckpointStatistics> history;
+
+	@JsonCreator
+	public CheckpointingStatistics(
+			@JsonProperty(FIELD_NAME_COUNTS) Counts counts,
+			@JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
+			@JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints,
+			@JsonProperty(FIELD_NAME_HISTORY) List<CheckpointStatistics> history) {
+		this.counts = Preconditions.checkNotNull(counts);
+		this.summary = Preconditions.checkNotNull(summary);
+		this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints);
+		this.history = Preconditions.checkNotNull(history);
+	}
+
+	public Counts getCounts() {
+		return counts;
+	}
+
+	public Summary getSummary() {
+		return summary;
+	}
+
+	public LatestCheckpoints getLatestCheckpoints() {
+		return latestCheckpoints;
+	}
+
+	public List<CheckpointStatistics> getHistory() {
+		return history;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		CheckpointingStatistics that = (CheckpointingStatistics) o;
+		return Objects.equals(counts, that.counts) &&
+			Objects.equals(summary, that.summary) &&
+			Objects.equals(latestCheckpoints, that.latestCheckpoints) &&
+			Objects.equals(history, that.history);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(counts, summary, latestCheckpoints, history);
+	}
+
+	// ------------------------------------------------------------------
+	// Inner classes
+	// ------------------------------------------------------------------
+
+	/**
+	 * Checkpoint counts.
+	 */
+	public static final class Counts {
+
+		public static final String FIELD_NAME_RESTORED_CHECKPOINTS = "restored";
+
+		public static final String FIELD_NAME_TOTAL_CHECKPOINTS = "total";
+
+		public static final String FIELD_NAME_IN_PROGRESS_CHECKPOINTS = "in_progress";
+
+		public static final String FIELD_NAME_COMPLETED_CHECKPOINTS = "completed";
+
+		public static final String FIELD_NAME_FAILED_CHECKPOINTS = "failed";
+
+		@JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS)
+		private final long numberRestoredCheckpoints;
+
+		@JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS)
+		private final long totalNumberCheckpoints;
+
+		@JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS)
+		private final int numberInProgressCheckpoints;
+
+		@JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS)
+		private final long numberCompletedCheckpoints;
+
+		@JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS)
+		private final long numberFailedCheckpoints;
+
+		@JsonCreator
+		public Counts(
+				@JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) long numberRestoredCheckpoints,
+				@JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) long totalNumberCheckpoints,
+				@JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) int numberInProgressCheckpoints,
+				@JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) long numberCompletedCheckpoints,
+				@JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) long numberFailedCheckpoints) {
+			this.numberRestoredCheckpoints = numberRestoredCheckpoints;
+			this.totalNumberCheckpoints = totalNumberCheckpoints;
+			this.numberInProgressCheckpoints = numberInProgressCheckpoints;
+			this.numberCompletedCheckpoints = numberCompletedCheckpoints;
+			this.numberFailedCheckpoints = numberFailedCheckpoints;
+		}
+
+		public long getNumberRestoredCheckpoints() {
+			return numberRestoredCheckpoints;
+		}
+
+		public long getTotalNumberCheckpoints() {
+			return totalNumberCheckpoints;
+		}
+
+		public int getNumberInProgressCheckpoints() {
+			return numberInProgressCheckpoints;
+		}
+
+		public long getNumberCompletedCheckpoints() {
+			return numberCompletedCheckpoints;
+		}
+
+		public long getNumberFailedCheckpoints() {
+			return numberFailedCheckpoints;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Counts counts = (Counts) o;
+			return numberRestoredCheckpoints == counts.numberRestoredCheckpoints &&
+				totalNumberCheckpoints == counts.totalNumberCheckpoints &&
+				numberInProgressCheckpoints == counts.numberInProgressCheckpoints &&
+				numberCompletedCheckpoints == counts.numberCompletedCheckpoints &&
+				numberFailedCheckpoints == counts.numberFailedCheckpoints;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(numberRestoredCheckpoints, totalNumberCheckpoints, numberInProgressCheckpoints, numberCompletedCheckpoints, numberFailedCheckpoints);
+		}
+	}
+
+	/**
+	 * Checkpoint summary.
+	 */
+	public static final class Summary {
+
+		public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+		public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
+
+		@JsonProperty(FIELD_NAME_STATE_SIZE)
+		private final MinMaxAvgStatistics stateSize;
+
+		@JsonProperty(FIELD_NAME_DURATION)
+		private final MinMaxAvgStatistics duration;
+
+		@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+		private final MinMaxAvgStatistics alignmentBuffered;
+
+		@JsonCreator
+		public Summary(
+				@JsonProperty(FIELD_NAME_STATE_SIZE) MinMaxAvgStatistics stateSize,
+				@JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration,
+				@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) MinMaxAvgStatistics alignmentBuffered) {
+			this.stateSize = stateSize;
+			this.duration = duration;
+			this.alignmentBuffered = alignmentBuffered;
+		}
+
+		public MinMaxAvgStatistics getStateSize() {
+			return stateSize;
+		}
+
+		public MinMaxAvgStatistics getDuration() {
+			return duration;
+		}
+
+		public MinMaxAvgStatistics getAlignmentBuffered() {
+			return alignmentBuffered;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Summary summary = (Summary) o;
+			return Objects.equals(stateSize, summary.stateSize) &&
+				Objects.equals(duration, summary.duration) &&
+				Objects.equals(alignmentBuffered, summary.alignmentBuffered);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(stateSize, duration, alignmentBuffered);
+		}
+	}
+
+	/**
+	 * Minimum, maximum and average statistics.
+	 */
+	public static final class MinMaxAvgStatistics {
+
+		public static final String FIELD_NAME_MINIMUM = "min";
+
+		public static final String FIELD_NAME_MAXIMUM = "max";
+
+		public static final String FIELD_NAME_AVERAGE = "avg";
+
+		@JsonProperty(FIELD_NAME_MINIMUM)
+		private final long minimum;
+
+		@JsonProperty(FIELD_NAME_MAXIMUM)
+		private final long maximum;
+
+		@JsonProperty(FIELD_NAME_AVERAGE)
+		private final long average;
+
+		@JsonCreator
+		public MinMaxAvgStatistics(
+				@JsonProperty(FIELD_NAME_MINIMUM) long minimum,
+				@JsonProperty(FIELD_NAME_MAXIMUM) long maximum,
+				@JsonProperty(FIELD_NAME_AVERAGE) long average) {
+			this.minimum = minimum;
+			this.maximum = maximum;
+			this.average = average;
+		}
+
+		public long getMinimum() {
+			return minimum;
+		}
+
+		public long getMaximum() {
+			return maximum;
+		}
+
+		public long getAverage() {
+			return average;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			MinMaxAvgStatistics that = (MinMaxAvgStatistics) o;
+			return minimum == that.minimum &&
+				maximum == that.maximum &&
+				average == that.average;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(minimum, maximum, average);
+		}
+	}
+
+	/**
+	 * Statistics about the latest checkpoints.
+	 */
+	public static final class LatestCheckpoints {
+
+		public static final String FIELD_NAME_COMPLETED = "completed";
+
+		public static final String FIELD_NAME_SAVEPOINT = "savepoint";
+
+		public static final String FIELD_NAME_FAILED = "failed";
+
+		public static final String FIELD_NAME_RESTORED = "restored";
+
+		@JsonProperty(FIELD_NAME_COMPLETED)
+		@Nullable
+		private final CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics;
+
+		@JsonProperty(FIELD_NAME_SAVEPOINT)
+		@Nullable
+		private final CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics;
+
+		@JsonProperty(FIELD_NAME_FAILED)
+		@Nullable
+		private final CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics;
+
+		@JsonProperty(FIELD_NAME_RESTORED)
+		@Nullable
+		private final RestoredCheckpointStatistics restoredCheckpointStatistics;
+
+		@JsonCreator
+		public LatestCheckpoints(
+				@JsonProperty(FIELD_NAME_COMPLETED) @Nullable CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics,
+				@JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics,
+				@JsonProperty(FIELD_NAME_FAILED) @Nullable CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics,
+				@JsonProperty(FIELD_NAME_RESTORED) @Nullable RestoredCheckpointStatistics restoredCheckpointStatistics) {
+			this.completedCheckpointStatistics = completedCheckpointStatistics;
+			this.savepointStatistics = savepointStatistics;
+			this.failedCheckpointStatistics = failedCheckpointStatistics;
+			this.restoredCheckpointStatistics = restoredCheckpointStatistics;
+		}
+
+		@Nullable
+		public CheckpointStatistics.CompletedCheckpointStatistics getCompletedCheckpointStatistics() {
+			return completedCheckpointStatistics;
+		}
+
+		@Nullable
+		public CheckpointStatistics.CompletedCheckpointStatistics getSavepointStatistics() {
+			return savepointStatistics;
+		}
+
+		@Nullable
+		public CheckpointStatistics.FailedCheckpointStatistics getFailedCheckpointStatistics() {
+			return failedCheckpointStatistics;
+		}
+
+		@Nullable
+		public RestoredCheckpointStatistics getRestoredCheckpointStatistics() {
+			return restoredCheckpointStatistics;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			LatestCheckpoints that = (LatestCheckpoints) o;
+			return Objects.equals(completedCheckpointStatistics, that.completedCheckpointStatistics) &&
+				Objects.equals(savepointStatistics, that.savepointStatistics) &&
+				Objects.equals(failedCheckpointStatistics, that.failedCheckpointStatistics) &&
+				Objects.equals(restoredCheckpointStatistics, that.restoredCheckpointStatistics);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(completedCheckpointStatistics, savepointStatistics, failedCheckpointStatistics, restoredCheckpointStatistics);
+		}
+	}
+
+	/**
+	 * Statistics for a restored checkpoint.
+	 */
+	public static final class RestoredCheckpointStatistics {
+
+		public static final String FIELD_NAME_ID = "id";
+
+		public static final String FIELD_NAME_RESTORE_TIMESTAMP = "restore_timestamp";
+
+		public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+		public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
+
+		@JsonProperty(FIELD_NAME_ID)
+		private final long id;
+
+		@JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP)
+		private final long restoreTimestamp;
+
+		@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+		private final boolean savepoint;
+
+		@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
+		@Nullable
+		private final String externalPath;
+
+		@JsonCreator
+		public RestoredCheckpointStatistics(
+				@JsonProperty(FIELD_NAME_ID) long id,
+				@JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) long restoreTimestamp,
+				@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+				@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath) {
+			this.id = id;
+			this.restoreTimestamp = restoreTimestamp;
+			this.savepoint = savepoint;
+			this.externalPath = externalPath;
+		}
+
+		public long getId() {
+			return id;
+		}
+
+		public long getRestoreTimestamp() {
+			return restoreTimestamp;
+		}
+
+		public boolean isSavepoint() {
+			return savepoint;
+		}
+
+		@Nullable
+		public String getExternalPath() {
+			return externalPath;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			RestoredCheckpointStatistics that = (RestoredCheckpointStatistics) o;
+			return id == that.id &&
+				restoreTimestamp == that.restoreTimestamp &&
+				savepoint == that.savepoint &&
+				Objects.equals(externalPath, that.externalPath);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(id, restoreTimestamp, savepoint, externalPath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
new file mode 100644
index 0000000..ce809e7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link CheckpointingStatisticsHandler}.
+ */
+public class CheckpointingStatisticsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> {
+
+	private static final CheckpointingStatisticsHeaders INSTANCE = new CheckpointingStatisticsHeaders();
+
+	public static final String URL = "/jobs/:jobid/checkpoints";
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<CheckpointingStatistics> getResponseClass() {
+		return CheckpointingStatistics.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static CheckpointingStatisticsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
new file mode 100644
index 0000000..b1f083f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson deserializer for {@link JobVertexID}.
+ */
+public class JobVertexIDDeserializer extends KeyDeserializer {
+
+	@Override
+	public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException {
+		return JobVertexID.fromHexString(key);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
new file mode 100644
index 0000000..f2b9859
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson serializer for {@link JobVertexID}.
+ */
+public class JobVertexIDSerializer extends StdSerializer<JobVertexID> {
+
+	private static final long serialVersionUID = 2970050507628933522L;
+
+	public JobVertexIDSerializer() {
+		super(JobVertexID.class);
+	}
+
+	@Override
+	public void serialize(JobVertexID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+		gen.writeFieldName(value.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
index 04b1c55..7337772 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
 
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 1eac20b..263117a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;