You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/10 15:35:04 UTC
[1/3] flink git commit: [FLINK-7709] Add
CheckpointStatisticDetailsHandler for new REST endpoint
Repository: flink
Updated Branches:
refs/heads/master 6b3fdc288 -> 0a286d0ff
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index b352bae..bcb13d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
index 709c21d..deffaae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.rest.handler.legacy.messages;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
/**
* Tests for the {@link CheckpointConfigInfo}.
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java
deleted file mode 100644
index 8e8a50f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
-
-import java.util.Arrays;
-
-/**
- * Tests for {@link CheckpointStatistics}.
- */
-public class CheckpointStatisticsTest extends RestResponseMarshallingTestBase<CheckpointStatistics> {
- @Override
- protected Class<CheckpointStatistics> getTestResponseClass() {
- return CheckpointStatistics.class;
- }
-
- @Override
- protected CheckpointStatistics getTestResponseInstance() throws Exception {
-
- final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts(1, 2, 3, 4, 5);
- final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary(
- new CheckpointStatistics.MinMaxAvgStatistics(1L, 1L, 1L),
- new CheckpointStatistics.MinMaxAvgStatistics(2L, 2L, 2L),
- new CheckpointStatistics.MinMaxAvgStatistics(3L, 3L, 3L));
-
- final CheckpointStatistics.CompletedCheckpointStatistics completed = new CheckpointStatistics.CompletedCheckpointStatistics(
- 1L,
- CheckpointStatsStatus.COMPLETED,
- false,
- 42L,
- 41L,
- 1337L,
- 1L,
- 0L,
- 10,
- 10,
- null,
- false);
-
- final CheckpointStatistics.CompletedCheckpointStatistics savepoint = new CheckpointStatistics.CompletedCheckpointStatistics(
- 2L,
- CheckpointStatsStatus.COMPLETED,
- true,
- 11L,
- 10L,
- 43L,
- 1L,
- 0L,
- 9,
- 9,
- "externalPath",
- false);
-
- final CheckpointStatistics.FailedCheckpointStatistics failed = new CheckpointStatistics.FailedCheckpointStatistics(
- 3L,
- CheckpointStatsStatus.FAILED,
- false,
- 5L,
- 10L,
- 4L,
- 2L,
- 0L,
- 11,
- 9,
- 100L,
- "Test failure");
-
- CheckpointStatistics.RestoredCheckpointStatistics restored = new CheckpointStatistics.RestoredCheckpointStatistics(
- 4L,
- 1445L,
- true,
- "foobar");
-
- final CheckpointStatistics.LatestCheckpoints latestCheckpoints = new CheckpointStatistics.LatestCheckpoints(
- completed,
- savepoint,
- failed,
- restored);
-
- return new CheckpointStatistics(
- counts,
- summary,
- latestCheckpoints,
- Arrays.asList(completed, savepoint, failed));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java
new file mode 100644
index 0000000..8521d34
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link CheckpointingStatistics}.
+ */
+public class CheckpointingStatisticsTest extends RestResponseMarshallingTestBase<CheckpointingStatistics> {
+ @Override
+ protected Class<CheckpointingStatistics> getTestResponseClass() {
+ return CheckpointingStatistics.class;
+ }
+
+ @Override
+ protected CheckpointingStatistics getTestResponseInstance() throws Exception {
+
+ final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(1, 2, 3, 4, 5);
+ final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary(
+ new CheckpointingStatistics.MinMaxAvgStatistics(1L, 1L, 1L),
+ new CheckpointingStatistics.MinMaxAvgStatistics(2L, 2L, 2L),
+ new CheckpointingStatistics.MinMaxAvgStatistics(3L, 3L, 3L));
+
+ final Map<JobVertexID, CheckpointStatistics.TaskCheckpointStatistics> checkpointStatisticsPerTask = new HashMap<>(2);
+
+ checkpointStatisticsPerTask.put(
+ new JobVertexID(),
+ new CheckpointStatistics.TaskCheckpointStatistics(
+ 1L,
+ 2L,
+ 3L,
+ 4L,
+ 5,
+ 6));
+
+ checkpointStatisticsPerTask.put(
+ new JobVertexID(),
+ new CheckpointStatistics.TaskCheckpointStatistics(
+ 2L,
+ 3L,
+ 4L,
+ 5L,
+ 6,
+ 7));
+
+ final CheckpointStatistics.CompletedCheckpointStatistics completed = new CheckpointStatistics.CompletedCheckpointStatistics(
+ 1L,
+ CheckpointStatsStatus.COMPLETED,
+ false,
+ 42L,
+ 41L,
+ 1337L,
+ 1L,
+ 0L,
+ 10,
+ 10,
+ Collections.emptyMap(),
+ null,
+ false);
+
+ final CheckpointStatistics.CompletedCheckpointStatistics savepoint = new CheckpointStatistics.CompletedCheckpointStatistics(
+ 2L,
+ CheckpointStatsStatus.COMPLETED,
+ true,
+ 11L,
+ 10L,
+ 43L,
+ 1L,
+ 0L,
+ 9,
+ 9,
+ checkpointStatisticsPerTask,
+ "externalPath",
+ false);
+
+ final CheckpointStatistics.FailedCheckpointStatistics failed = new CheckpointStatistics.FailedCheckpointStatistics(
+ 3L,
+ CheckpointStatsStatus.FAILED,
+ false,
+ 5L,
+ 10L,
+ 4L,
+ 2L,
+ 0L,
+ 11,
+ 9,
+ Collections.emptyMap(),
+ 100L,
+ "Test failure");
+
+ CheckpointingStatistics.RestoredCheckpointStatistics restored = new CheckpointingStatistics.RestoredCheckpointStatistics(
+ 4L,
+ 1445L,
+ true,
+ "foobar");
+
+ final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
+ completed,
+ savepoint,
+ failed,
+ restored);
+
+ return new CheckpointingStatistics(
+ counts,
+ summary,
+ latestCheckpoints,
+ Arrays.asList(completed, savepoint, failed));
+ }
+}
[3/3] flink git commit: [FLINK-7709] Add
CheckpointStatisticDetailsHandler for new REST endpoint
Posted by tr...@apache.org.
[FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint
Disable failing when not all creator properties are known
Move CheckpointStatsCache out of legacy package; Remove unused CheckpointingStatistics#generateCheckpointStatistics method
Remove JsonInclude.Include.NON_NULL from CheckpointStatistics; Pull null check out of CheckpointStatistics#generateCheckpointStatistics; Make CheckpointStatistics#checkpointStatisticcsPerTask non nullable; Add fail on missing creator property
This closes #4763.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a286d0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a286d0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a286d0f
Branch: refs/heads/master
Commit: 0a286d0ff98afa68034daff4634f526eaaf97897
Parents: 6b3fdc2
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 2 19:39:38 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 10 17:34:14 2017 +0200
----------------------------------------------------------------------
.../runtime/webmonitor/WebRuntimeMonitor.java | 2 +-
.../dispatcher/DispatcherRestEndpoint.java | 29 +-
.../rest/handler/RestHandlerConfiguration.java | 18 +-
.../job/AbstractExecutionGraphHandler.java | 10 +-
.../rest/handler/job/JobConfigHandler.java | 5 +-
.../checkpoints/AbstractCheckpointHandler.java | 91 +++
.../checkpoints/CheckpointConfigHandler.java | 7 +-
.../CheckpointStatisticDetailsHandler.java | 54 ++
.../CheckpointStatisticsHandler.java | 181 -----
.../job/checkpoints/CheckpointStatsCache.java | 81 ++
.../CheckpointingStatisticsHandler.java | 153 ++++
.../checkpoints/CheckpointConfigHandler.java | 2 +-
.../checkpoints/CheckpointStatsCache.java | 81 --
.../CheckpointStatsDetailsHandler.java | 1 +
.../CheckpointStatsDetailsSubtasksHandler.java | 1 +
.../checkpoints/CheckpointStatsHandler.java | 81 +-
.../rest/messages/CheckpointConfigHeaders.java | 70 --
.../rest/messages/CheckpointConfigInfo.java | 151 ----
.../rest/messages/CheckpointStatistics.java | 763 -------------------
.../messages/CheckpointStatisticsHeaders.java | 68 --
.../rest/messages/JobMessageParameters.java | 2 +-
.../checkpoints/CheckpointConfigHeaders.java | 73 ++
.../checkpoints/CheckpointConfigInfo.java | 152 ++++
.../checkpoints/CheckpointIdPathParameter.java | 48 ++
.../CheckpointMessageParameters.java | 38 +
.../CheckpointStatisticDetailsHeaders.java | 72 ++
.../checkpoints/CheckpointStatistics.java | 537 +++++++++++++
.../checkpoints/CheckpointingStatistics.java | 478 ++++++++++++
.../CheckpointingStatisticsHeaders.java | 71 ++
.../messages/json/JobVertexIDDeserializer.java | 37 +
.../messages/json/JobVertexIDSerializer.java | 44 ++
.../checkpoints/CheckpointStatsCacheTest.java | 1 +
.../CheckpointStatsDetailsHandlerTest.java | 1 +
...heckpointStatsSubtaskDetailsHandlerTest.java | 1 +
.../messages/CheckpointConfigInfoTest.java | 2 +-
.../messages/CheckpointStatisticsTest.java | 104 ---
.../messages/CheckpointingStatisticsTest.java | 134 ++++
37 files changed, 2164 insertions(+), 1480 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 0bf6552..1a6178f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.WebHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
@@ -58,7 +59,6 @@ import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler;
-import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler;
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index d64e649..2a2d9be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -32,7 +32,9 @@ import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
@@ -43,8 +45,6 @@ import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpeci
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders;
-import org.apache.flink.runtime.rest.messages.CheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
@@ -52,6 +52,9 @@ import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FileUtils;
@@ -78,6 +81,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
private final Executor executor;
private final ExecutionGraphCache executionGraphCache;
+ private final CheckpointStatsCache checkpointStatsCache;
public DispatcherRestEndpoint(
RestServerEndpointConfiguration endpointConfiguration,
@@ -94,6 +98,9 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
this.executionGraphCache = new ExecutionGraphCache(
restConfiguration.getTimeout(),
Time.milliseconds(restConfiguration.getRefreshInterval()));
+
+ this.checkpointStatsCache = new CheckpointStatsCache(
+ restConfiguration.getMaxCheckpointStatisticCacheEntries());
}
@Override
@@ -162,14 +169,23 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executionGraphCache,
executor);
- CheckpointStatisticsHandler checkpointStatisticsHandler = new CheckpointStatisticsHandler(
+ CheckpointingStatisticsHandler checkpointStatisticsHandler = new CheckpointingStatisticsHandler(
restAddressFuture,
leaderRetriever,
timeout,
- CheckpointStatisticsHeaders.getInstance(),
+ CheckpointingStatisticsHeaders.getInstance(),
executionGraphCache,
executor);
+ CheckpointStatisticDetailsHandler checkpointStatisticDetailsHandler = new CheckpointStatisticDetailsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ CheckpointStatisticDetailsHeaders.getInstance(),
+ executionGraphCache,
+ executor,
+ checkpointStatsCache);
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -192,7 +208,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
- handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
+ handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
+ handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler));
BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 9220bd9..0344597 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -32,14 +32,22 @@ public class RestHandlerConfiguration {
private final long refreshInterval;
+ private final int maxCheckpointStatisticCacheEntries;
+
private final Time timeout;
private final File tmpDir;
- public RestHandlerConfiguration(long refreshInterval, Time timeout, File tmpDir) {
+ public RestHandlerConfiguration(
+ long refreshInterval,
+ int maxCheckpointStatisticCacheEntries,
+ Time timeout,
+ File tmpDir) {
Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
this.refreshInterval = refreshInterval;
+ this.maxCheckpointStatisticCacheEntries = maxCheckpointStatisticCacheEntries;
+
this.timeout = Preconditions.checkNotNull(timeout);
this.tmpDir = Preconditions.checkNotNull(tmpDir);
}
@@ -48,6 +56,10 @@ public class RestHandlerConfiguration {
return refreshInterval;
}
+ public int getMaxCheckpointStatisticCacheEntries() {
+ return maxCheckpointStatisticCacheEntries;
+ }
+
public Time getTimeout() {
return timeout;
}
@@ -59,10 +71,12 @@ public class RestHandlerConfiguration {
public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);
+ final int maxCheckpointStatisticCacheEntries = configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
+
final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
- return new RestHandlerConfiguration(refreshInterval, timeout, tmpDir);
+ return new RestHandlerConfiguration(refreshInterval, maxCheckpointStatisticCacheEntries, timeout, tmpDir);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index f2b1ac8..5348b55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -45,7 +45,7 @@ import java.util.concurrent.Executor;
*
* @param <R> response type
*/
-public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, JobMessageParameters> {
+public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M extends JobMessageParameters> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, M> {
private final ExecutionGraphCache executionGraphCache;
@@ -55,7 +55,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
- MessageHeaders<EmptyRequestBody, R, JobMessageParameters> messageHeaders,
+ MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {
super(localRestAddress, leaderRetriever, timeout, messageHeaders);
@@ -65,7 +65,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
}
@Override
- protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, M> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
JobID jobId = request.getPathParameter(JobIDPathParameter.class);
CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, gateway);
@@ -73,7 +73,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
return executionGraphFuture.thenApplyAsync(
executionGraph -> {
try {
- return handleRequest(executionGraph);
+ return handleRequest(request, executionGraph);
} catch (RestHandlerException rhe) {
throw new CompletionException(rhe);
}
@@ -81,5 +81,5 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
executor);
}
- protected abstract R handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException;
+ protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph) throws RestHandlerException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index bbe4eef..f27d84f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobConfigInfo;
@@ -35,7 +36,7 @@ import java.util.concurrent.Executor;
/**
* Handler serving the job configuration.
*/
-public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo> {
+public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> {
public JobConfigHandler(
CompletableFuture<String> localRestAddress,
@@ -55,7 +56,7 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInf
}
@Override
- protected JobConfigInfo handleRequest(AccessExecutionGraph executionGraph) {
+ protected JobConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig();
final JobConfigInfo.ExecutionConfigInfo executionConfigInfo;
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
new file mode 100644
index 0000000..62ed1a4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for checkpoint related REST handler.
+ *
+ * @param <R> type of the response
+ */
+public abstract class AbstractCheckpointHandler<R extends ResponseBody> extends AbstractExecutionGraphHandler<R, CheckpointMessageParameters> {
+
+ private final CheckpointStatsCache checkpointStatsCache;
+
+ protected AbstractCheckpointHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ MessageHeaders<EmptyRequestBody, R, CheckpointMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor,
+ CheckpointStatsCache checkpointStatsCache) {
+ super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
+
+ this.checkpointStatsCache = Preconditions.checkNotNull(checkpointStatsCache);
+ }
+
+ @Override
+ protected R handleRequest(HandlerRequest<EmptyRequestBody, CheckpointMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+ final long checkpointId = request.getPathParameter(CheckpointIdPathParameter.class);
+
+ final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
+
+ if (checkpointStatsSnapshot != null) {
+ AbstractCheckpointStats checkpointStats = checkpointStatsSnapshot.getHistory().getCheckpointById(checkpointId);
+
+ if (checkpointStats != null) {
+ checkpointStatsCache.tryAdd(checkpointStats);
+ } else {
+ checkpointStats = checkpointStatsCache.tryGet(checkpointId);
+ }
+
+ if (checkpointStats != null) {
+ return handleCheckpointRequest(checkpointStats);
+ } else {
+ throw new RestHandlerException("Could not find checkpointing statistics for checkpoint " + checkpointId + '.', HttpResponseStatus.NOT_FOUND);
+ }
+ } else {
+ throw new RestHandlerException("Checkpointing was not enabled for job " + executionGraph.getJobID() + '.', HttpResponseStatus.NOT_FOUND);
+ }
+ }
+
+ protected abstract R handleCheckpointRequest(AbstractCheckpointStats checkpointStats);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 94646eb..1efa7af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -40,7 +41,7 @@ import java.util.concurrent.Executor;
/**
* Handler which serves the checkpoint configuration.
*/
-public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo> {
+public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> {
public CheckpointConfigHandler(
CompletableFuture<String> localRestAddress,
@@ -59,7 +60,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check
}
@Override
- protected CheckpointConfigInfo handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException {
+ protected CheckpointConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration();
if (checkpointCoordinatorConfiguration == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
new file mode 100644
index 0000000..2fc3008
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * REST handler which returns the details for a checkpoint.
+ */
+public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics> {
+
+ public CheckpointStatisticDetailsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ MessageHeaders<EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor,
+ CheckpointStatsCache checkpointStatsCache) {
+ super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache);
+ }
+
+ @Override
+ protected CheckpointStatistics handleCheckpointRequest(AbstractCheckpointStats checkpointStats) {
+ return CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
deleted file mode 100644
index 21ded78..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job.checkpoints;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
-import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Handler which serves the checkpoint statistics.
- */
-public class CheckpointStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointStatistics> {
-
- public CheckpointStatisticsHandler(
- CompletableFuture<String> localRestAddress,
- GatewayRetriever<? extends RestfulGateway> leaderRetriever,
- Time timeout,
- MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> messageHeaders,
- ExecutionGraphCache executionGraphCache,
- Executor executor) {
- super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
- }
-
- @Override
- protected CheckpointStatistics handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException {
-
- final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
-
- if (checkpointStatsSnapshot == null) {
- throw new RestHandlerException("Checkpointing has not been enabled.", HttpResponseStatus.NOT_FOUND);
- } else {
- final CheckpointStatsCounts checkpointStatsCounts = checkpointStatsSnapshot.getCounts();
-
- final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts(
- checkpointStatsCounts.getNumberOfRestoredCheckpoints(),
- checkpointStatsCounts.getTotalNumberOfCheckpoints(),
- checkpointStatsCounts.getNumberOfInProgressCheckpoints(),
- checkpointStatsCounts.getNumberOfCompletedCheckpoints(),
- checkpointStatsCounts.getNumberOfFailedCheckpoints());
-
- final CompletedCheckpointStatsSummary checkpointStatsSummary = checkpointStatsSnapshot.getSummaryStats();
- final MinMaxAvgStats stateSize = checkpointStatsSummary.getStateSizeStats();
- final MinMaxAvgStats duration = checkpointStatsSummary.getEndToEndDurationStats();
- final MinMaxAvgStats alignment = checkpointStatsSummary.getAlignmentBufferedStats();
-
- final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary(
- new CheckpointStatistics.MinMaxAvgStatistics(
- stateSize.getMinimum(),
- stateSize.getMaximum(),
- stateSize.getAverage()),
- new CheckpointStatistics.MinMaxAvgStatistics(
- duration.getMinimum(),
- duration.getMaximum(),
- duration.getAverage()),
- new CheckpointStatistics.MinMaxAvgStatistics(
- alignment.getMinimum(),
- alignment.getMaximum(),
- alignment.getAverage()));
-
- final CheckpointStatsHistory checkpointStatsHistory = checkpointStatsSnapshot.getHistory();
-
- final CheckpointStatistics.CompletedCheckpointStatistics completed = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestCompletedCheckpoint());
- final CheckpointStatistics.CompletedCheckpointStatistics savepoint = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestSavepoint());
- final CheckpointStatistics.FailedCheckpointStatistics failed = (CheckpointStatistics.FailedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestFailedCheckpoint());
-
- final RestoredCheckpointStats restoredCheckpointStats = checkpointStatsSnapshot.getLatestRestoredCheckpoint();
-
- final CheckpointStatistics.RestoredCheckpointStatistics restored;
-
- if (restoredCheckpointStats == null) {
- restored = null;
- } else {
- restored = new CheckpointStatistics.RestoredCheckpointStatistics(
- restoredCheckpointStats.getCheckpointId(),
- restoredCheckpointStats.getRestoreTimestamp(),
- restoredCheckpointStats.getProperties().isSavepoint(),
- restoredCheckpointStats.getExternalPath());
- }
-
- final CheckpointStatistics.LatestCheckpoints latestCheckpoints = new CheckpointStatistics.LatestCheckpoints(
- completed,
- savepoint,
- failed,
- restored);
-
- final List<CheckpointStatistics.BaseCheckpointStatistics> history = new ArrayList<>(16);
-
- for (AbstractCheckpointStats abstractCheckpointStats : checkpointStatsSnapshot.getHistory().getCheckpoints()) {
- history.add(generateCheckpointStatistics(abstractCheckpointStats));
- }
-
- return new CheckpointStatistics(
- counts,
- summary,
- latestCheckpoints,
- history);
- }
- }
-
- private static CheckpointStatistics.BaseCheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
- if (checkpointStats != null) {
- if (checkpointStats instanceof CompletedCheckpointStats) {
- final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
-
- return new CheckpointStatistics.CompletedCheckpointStatistics(
- completedCheckpointStats.getCheckpointId(),
- completedCheckpointStats.getStatus(),
- completedCheckpointStats.getProperties().isSavepoint(),
- completedCheckpointStats.getTriggerTimestamp(),
- completedCheckpointStats.getLatestAckTimestamp(),
- completedCheckpointStats.getStateSize(),
- completedCheckpointStats.getEndToEndDuration(),
- completedCheckpointStats.getAlignmentBuffered(),
- completedCheckpointStats.getNumberOfSubtasks(),
- completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
- completedCheckpointStats.getExternalPath(),
- completedCheckpointStats.isDiscarded());
- } else if (checkpointStats instanceof FailedCheckpointStats) {
- final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
-
- return new CheckpointStatistics.FailedCheckpointStatistics(
- failedCheckpointStats.getCheckpointId(),
- failedCheckpointStats.getStatus(),
- failedCheckpointStats.getProperties().isSavepoint(),
- failedCheckpointStats.getTriggerTimestamp(),
- failedCheckpointStats.getLatestAckTimestamp(),
- failedCheckpointStats.getStateSize(),
- failedCheckpointStats.getEndToEndDuration(),
- failedCheckpointStats.getAlignmentBuffered(),
- failedCheckpointStats.getNumberOfSubtasks(),
- failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
- failedCheckpointStats.getFailureTimestamp(),
- failedCheckpointStats.getFailureMessage());
- } else {
- throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
- }
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
new file mode 100644
index 0000000..dcd36b0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+/**
+ * A size-based cache of accessed checkpoints for completed and failed
+ * checkpoints.
+ *
+ * <p>Having this cache in place for accessed stats improves the user
+ * experience quite a bit as accessed checkpoint stats stay available
+ * and don't expire. For example if you manage to click on the last
+ * checkpoint in the history, it is not available via the stats as soon
+ * as another checkpoint is triggered. With the cache in place, the
+ * checkpoint will still be available for investigation.
+ */
+public class CheckpointStatsCache {
+
+ @Nullable
+ private final Cache<Long, AbstractCheckpointStats> cache;
+
+ public CheckpointStatsCache(int maxNumEntries) {
+ if (maxNumEntries > 0) {
+ this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder()
+ .maximumSize(maxNumEntries)
+ .build();
+ } else {
+ this.cache = null;
+ }
+ }
+
+ /**
+ * Try to add the checkpoint to the cache.
+ *
+ * @param checkpoint Checkpoint to be added.
+ */
+ public void tryAdd(AbstractCheckpointStats checkpoint) {
+ // Don't add in progress checkpoints as they will be replaced by their
+ // completed/failed version eventually.
+ if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
+ cache.put(checkpoint.getCheckpointId(), checkpoint);
+ }
+ }
+
+ /**
+ * Try to look up a checkpoint by it's ID in the cache.
+ *
+ * @param checkpointId ID of the checkpoint to look up.
+ * @return The checkpoint or <code>null</code> if checkpoint not found.
+ */
+ public AbstractCheckpointStats tryGet(long checkpointId) {
+ if (cache != null) {
+ return cache.getIfPresent(checkpointId);
+ } else {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
new file mode 100644
index 0000000..1c5762e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler which serves the checkpoint statistics.
+ */
+public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> {
+
+ public CheckpointingStatisticsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+ super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
+ }
+
+ @Override
+ protected CheckpointingStatistics handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+
+ final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
+
+ if (checkpointStatsSnapshot == null) {
+ throw new RestHandlerException("Checkpointing has not been enabled.", HttpResponseStatus.NOT_FOUND);
+ } else {
+ final CheckpointStatsCounts checkpointStatsCounts = checkpointStatsSnapshot.getCounts();
+
+ final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(
+ checkpointStatsCounts.getNumberOfRestoredCheckpoints(),
+ checkpointStatsCounts.getTotalNumberOfCheckpoints(),
+ checkpointStatsCounts.getNumberOfInProgressCheckpoints(),
+ checkpointStatsCounts.getNumberOfCompletedCheckpoints(),
+ checkpointStatsCounts.getNumberOfFailedCheckpoints());
+
+ final CompletedCheckpointStatsSummary checkpointStatsSummary = checkpointStatsSnapshot.getSummaryStats();
+ final MinMaxAvgStats stateSize = checkpointStatsSummary.getStateSizeStats();
+ final MinMaxAvgStats duration = checkpointStatsSummary.getEndToEndDurationStats();
+ final MinMaxAvgStats alignment = checkpointStatsSummary.getAlignmentBufferedStats();
+
+ final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary(
+ new CheckpointingStatistics.MinMaxAvgStatistics(
+ stateSize.getMinimum(),
+ stateSize.getMaximum(),
+ stateSize.getAverage()),
+ new CheckpointingStatistics.MinMaxAvgStatistics(
+ duration.getMinimum(),
+ duration.getMaximum(),
+ duration.getAverage()),
+ new CheckpointingStatistics.MinMaxAvgStatistics(
+ alignment.getMinimum(),
+ alignment.getMaximum(),
+ alignment.getAverage()));
+
+ final CheckpointStatsHistory checkpointStatsHistory = checkpointStatsSnapshot.getHistory();
+
+ final CheckpointStatistics.CompletedCheckpointStatistics completed = checkpointStatsHistory.getLatestCompletedCheckpoint() != null ?
+ (CheckpointStatistics.CompletedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics(
+ checkpointStatsHistory.getLatestCompletedCheckpoint(),
+ false) :
+ null;
+
+ final CheckpointStatistics.CompletedCheckpointStatistics savepoint = checkpointStatsHistory.getLatestSavepoint() != null ?
+ (CheckpointStatistics.CompletedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics(
+ checkpointStatsHistory.getLatestSavepoint(),
+ false) :
+ null;
+
+ final CheckpointStatistics.FailedCheckpointStatistics failed = checkpointStatsHistory.getLatestFailedCheckpoint() != null ?
+ (CheckpointStatistics.FailedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics(
+ checkpointStatsHistory.getLatestFailedCheckpoint(),
+ false) :
+ null;
+
+ final RestoredCheckpointStats restoredCheckpointStats = checkpointStatsSnapshot.getLatestRestoredCheckpoint();
+
+ final CheckpointingStatistics.RestoredCheckpointStatistics restored;
+
+ if (restoredCheckpointStats == null) {
+ restored = null;
+ } else {
+ restored = new CheckpointingStatistics.RestoredCheckpointStatistics(
+ restoredCheckpointStats.getCheckpointId(),
+ restoredCheckpointStats.getRestoreTimestamp(),
+ restoredCheckpointStats.getProperties().isSavepoint(),
+ restoredCheckpointStats.getExternalPath());
+ }
+
+ final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
+ completed,
+ savepoint,
+ failed,
+ restored);
+
+ final List<CheckpointStatistics> history = new ArrayList<>(16);
+
+ for (AbstractCheckpointStats abstractCheckpointStats : checkpointStatsSnapshot.getHistory().getCheckpoints()) {
+ history.add(CheckpointStatistics.generateCheckpointStatistics(abstractCheckpointStats, false));
+ }
+
+ return new CheckpointingStatistics(
+ counts,
+ summary,
+ latestCheckpoints,
+ history);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index f50c42d..60b9799 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
deleted file mode 100644
index f21fc76..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
-
-import javax.annotation.Nullable;
-
-/**
- * A size-based cache of accessed checkpoints for completed and failed
- * checkpoints.
- *
- * <p>Having this cache in place for accessed stats improves the user
- * experience quite a bit as accessed checkpoint stats stay available
- * and don't expire. For example if you manage to click on the last
- * checkpoint in the history, it is not available via the stats as soon
- * as another checkpoint is triggered. With the cache in place, the
- * checkpoint will still be available for investigation.
- */
-public class CheckpointStatsCache {
-
- @Nullable
- private final Cache<Long, AbstractCheckpointStats> cache;
-
- public CheckpointStatsCache(int maxNumEntries) {
- if (maxNumEntries > 0) {
- this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder()
- .maximumSize(maxNumEntries)
- .build();
- } else {
- this.cache = null;
- }
- }
-
- /**
- * Try to add the checkpoint to the cache.
- *
- * @param checkpoint Checkpoint to be added.
- */
- void tryAdd(AbstractCheckpointStats checkpoint) {
- // Don't add in progress checkpoints as they will be replaced by their
- // completed/failed version eventually.
- if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
- cache.put(checkpoint.getCheckpointId(), checkpoint);
- }
- }
-
- /**
- * Try to look up a checkpoint by it's ID in the cache.
- *
- * @param checkpointId ID of the checkpoint to look up.
- * @return The checkpoint or <code>null</code> if checkpoint not found.
- */
- AbstractCheckpointStats tryGet(long checkpointId) {
- if (cache != null) {
- return cache.getIfPresent(checkpointId);
- } else {
- return null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
index e277971..dce1641 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index 5420cf4..1421fb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
index 5b35c7f..b6c86be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -31,7 +31,8 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
-import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
@@ -129,37 +130,37 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
}
private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
- gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_COUNTS);
- gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints());
- gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints());
- gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints());
- gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints());
- gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints());
+ gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_COUNTS);
+ gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints());
+ gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints());
+ gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints());
+ gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints());
+ gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints());
gen.writeEndObject();
}
private static void writeSummary(
JsonGenerator gen,
CompletedCheckpointStatsSummary summary) throws IOException {
- gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_SUMMARY);
- gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_STATE_SIZE);
+ gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_SUMMARY);
+ gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_STATE_SIZE);
writeMinMaxAvg(gen, summary.getStateSizeStats());
gen.writeEndObject();
- gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_DURATION);
+ gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_DURATION);
writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
gen.writeEndObject();
- gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED);
+ gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED);
writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
gen.writeEndObject();
gen.writeEndObject();
}
static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
- gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum());
- gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum());
- gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage());
+ gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum());
+ gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum());
+ gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage());
}
private static void writeLatestCheckpoints(
@@ -169,10 +170,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
@Nullable FailedCheckpointStats failed,
@Nullable RestoredCheckpointStats restored) throws IOException {
- gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_LATEST_CHECKPOINTS);
+ gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS);
// Completed checkpoint
if (completed != null) {
- gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED);
+ gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED);
writeCheckpoint(gen, completed);
String externalPath = completed.getExternalPath();
@@ -185,7 +186,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
// Completed savepoint
if (savepoint != null) {
- gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT);
+ gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT);
writeCheckpoint(gen, savepoint);
String externalPath = savepoint.getExternalPath();
@@ -197,7 +198,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
// Failed checkpoint
if (failed != null) {
- gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_FAILED);
+ gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_FAILED);
writeCheckpoint(gen, failed);
gen.writeNumberField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_TIMESTAMP, failed.getFailureTimestamp());
@@ -210,14 +211,14 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
// Restored checkpoint
if (restored != null) {
- gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_RESTORED);
- gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId());
- gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp());
- gen.writeBooleanField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint());
+ gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_RESTORED);
+ gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId());
+ gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp());
+ gen.writeBooleanField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint());
String externalPath = restored.getExternalPath();
if (externalPath != null) {
- gen.writeStringField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath);
+ gen.writeStringField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath);
}
gen.writeEndObject();
}
@@ -225,29 +226,29 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
}
private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
}
private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
- gen.writeArrayFieldStart(CheckpointStatistics.FIELD_NAME_HISTORY);
+ gen.writeArrayFieldStart(CheckpointingStatistics.FIELD_NAME_HISTORY);
for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
gen.writeStartObject();
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
- gen.writeStringField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString());
- gen.writeBooleanField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks());
- gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
+ gen.writeStringField(CheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString());
+ gen.writeBooleanField(CheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks());
+ gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks());
if (checkpoint.getStatus().isCompleted()) {
// --- Completed ---
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
deleted file mode 100644
index bfc0b7a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * Message headers for the {@link CheckpointConfigHandler}.
- */
-public class CheckpointConfigHeaders implements MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
-
- private static final CheckpointConfigHeaders INSTANCE = new CheckpointConfigHeaders();
-
- public static final String URL = "/jobs/:jobid/checkpoints/config";
-
- private CheckpointConfigHeaders() {}
-
- @Override
- public Class<EmptyRequestBody> getRequestClass() {
- return EmptyRequestBody.class;
- }
-
- @Override
- public Class<CheckpointConfigInfo> getResponseClass() {
- return CheckpointConfigInfo.class;
- }
-
- @Override
- public HttpResponseStatus getResponseStatusCode() {
- return HttpResponseStatus.OK;
- }
-
- @Override
- public JobMessageParameters getUnresolvedMessageParameters() {
- return new JobMessageParameters();
- }
-
- @Override
- public HttpMethodWrapper getHttpMethod() {
- return HttpMethodWrapper.GET;
- }
-
- @Override
- public String getTargetRestEndpointURL() {
- return URL;
- }
-
- public static CheckpointConfigHeaders getInstance() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
deleted file mode 100644
index fbda12a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-/**
- * Response class of the {@link CheckpointConfigHandler}.
- */
-public class CheckpointConfigInfo implements ResponseBody {
-
- public static final String FIELD_NAME_PROCESSING_MODE = "mode";
-
- public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
-
- public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
-
- public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause";
-
- public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent";
-
- public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization";
-
- @JsonProperty(FIELD_NAME_PROCESSING_MODE)
- private final ProcessingMode processingMode;
-
- @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL)
- private final long checkpointInterval;
-
- @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT)
- private final long checkpointTimeout;
-
- @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE)
- private final long minPauseBetweenCheckpoints;
-
- @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT)
- private final long maxConcurrentCheckpoints;
-
- @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG)
- private final ExternalizedCheckpointInfo externalizedCheckpointInfo;
-
- @JsonCreator
- public CheckpointConfigInfo(
- @JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode,
- @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long checkpointInterval,
- @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long checkpointTimeout,
- @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long minPauseBetweenCheckpoints,
- @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int maxConcurrentCheckpoints,
- @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo) {
- this.processingMode = Preconditions.checkNotNull(processingMode);
- this.checkpointInterval = checkpointInterval;
- this.checkpointTimeout = checkpointTimeout;
- this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
- this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
- this.externalizedCheckpointInfo = Preconditions.checkNotNull(externalizedCheckpointInfo);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- CheckpointConfigInfo that = (CheckpointConfigInfo) o;
- return checkpointInterval == that.checkpointInterval &&
- checkpointTimeout == that.checkpointTimeout &&
- minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
- maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
- processingMode == that.processingMode &&
- Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointInfo);
- }
-
- /**
- * Contains information about the externalized checkpoint configuration.
- */
- public static final class ExternalizedCheckpointInfo {
-
- public static final String FIELD_NAME_ENABLED = "enabled";
-
- public static final String FIELD_NAME_DELETE_ON_CANCELLATION = "delete_on_cancellation";
-
- @JsonProperty(FIELD_NAME_ENABLED)
- private final boolean enabled;
-
- @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION)
- private final boolean deleteOnCancellation;
-
- @JsonCreator
- public ExternalizedCheckpointInfo(
- @JsonProperty(FIELD_NAME_ENABLED) boolean enabled,
- @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) {
- this.enabled = enabled;
- this.deleteOnCancellation = deleteOnCancellation;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ExternalizedCheckpointInfo that = (ExternalizedCheckpointInfo) o;
- return enabled == that.enabled &&
- deleteOnCancellation == that.deleteOnCancellation;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(enabled, deleteOnCancellation);
- }
- }
-
- /**
- * Processing mode.
- */
- public enum ProcessingMode {
- AT_LEAST_ONCE,
- EXACTLY_ONCE
- }
-}
[2/3] flink git commit: [FLINK-7709] Add
CheckpointStatisticDetailsHandler for new REST endpoint
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
deleted file mode 100644
index ade8c7a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
+++ /dev/null
@@ -1,763 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Response of the {@link CheckpointStatisticsHandler}.
- */
-public class CheckpointStatistics implements ResponseBody {
-
- public static final String FIELD_NAME_COUNTS = "counts";
-
- public static final String FIELD_NAME_SUMMARY = "summary";
-
- public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest";
-
- public static final String FIELD_NAME_HISTORY = "history";
-
- @JsonProperty(FIELD_NAME_COUNTS)
- private final Counts counts;
-
- @JsonProperty(FIELD_NAME_SUMMARY)
- private final Summary summary;
-
- @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
- private final LatestCheckpoints latestCheckpoints;
-
- @JsonProperty(FIELD_NAME_HISTORY)
- private final List<BaseCheckpointStatistics> history;
-
- @JsonCreator
- public CheckpointStatistics(
- @JsonProperty(FIELD_NAME_COUNTS) Counts counts,
- @JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
- @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints,
- @JsonProperty(FIELD_NAME_HISTORY) List<BaseCheckpointStatistics> history) {
- this.counts = Preconditions.checkNotNull(counts);
- this.summary = Preconditions.checkNotNull(summary);
- this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints);
- this.history = Preconditions.checkNotNull(history);
- }
-
- public Counts getCounts() {
- return counts;
- }
-
- public Summary getSummary() {
- return summary;
- }
-
- public LatestCheckpoints getLatestCheckpoints() {
- return latestCheckpoints;
- }
-
- public List<BaseCheckpointStatistics> getHistory() {
- return history;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- CheckpointStatistics that = (CheckpointStatistics) o;
- return Objects.equals(counts, that.counts) &&
- Objects.equals(summary, that.summary) &&
- Objects.equals(latestCheckpoints, that.latestCheckpoints) &&
- Objects.equals(history, that.history);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(counts, summary, latestCheckpoints, history);
- }
-
- // ------------------------------------------------------------------
- // Inner classes
- // ------------------------------------------------------------------
-
- /**
- * Checkpoint counts.
- */
- public static final class Counts {
-
- public static final String FIELD_NAME_RESTORED_CHECKPOINTS = "restored";
-
- public static final String FIELD_NAME_TOTAL_CHECKPOINTS = "total";
-
- public static final String FIELD_NAME_IN_PROGRESS_CHECKPOINTS = "in_progress";
-
- public static final String FIELD_NAME_COMPLETED_CHECKPOINTS = "completed";
-
- public static final String FIELD_NAME_FAILED_CHECKPOINTS = "failed";
-
- @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS)
- private final long numberRestoredCheckpoints;
-
- @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS)
- private final long totalNumberCheckpoints;
-
- @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS)
- private final int numberInProgressCheckpoints;
-
- @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS)
- private final long numberCompletedCheckpoints;
-
- @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS)
- private final long numberFailedCheckpoints;
-
- @JsonCreator
- public Counts(
- @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) long numberRestoredCheckpoints,
- @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) long totalNumberCheckpoints,
- @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) int numberInProgressCheckpoints,
- @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) long numberCompletedCheckpoints,
- @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) long numberFailedCheckpoints) {
- this.numberRestoredCheckpoints = numberRestoredCheckpoints;
- this.totalNumberCheckpoints = totalNumberCheckpoints;
- this.numberInProgressCheckpoints = numberInProgressCheckpoints;
- this.numberCompletedCheckpoints = numberCompletedCheckpoints;
- this.numberFailedCheckpoints = numberFailedCheckpoints;
- }
-
- public long getNumberRestoredCheckpoints() {
- return numberRestoredCheckpoints;
- }
-
- public long getTotalNumberCheckpoints() {
- return totalNumberCheckpoints;
- }
-
- public int getNumberInProgressCheckpoints() {
- return numberInProgressCheckpoints;
- }
-
- public long getNumberCompletedCheckpoints() {
- return numberCompletedCheckpoints;
- }
-
- public long getNumberFailedCheckpoints() {
- return numberFailedCheckpoints;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Counts counts = (Counts) o;
- return numberRestoredCheckpoints == counts.numberRestoredCheckpoints &&
- totalNumberCheckpoints == counts.totalNumberCheckpoints &&
- numberInProgressCheckpoints == counts.numberInProgressCheckpoints &&
- numberCompletedCheckpoints == counts.numberCompletedCheckpoints &&
- numberFailedCheckpoints == counts.numberFailedCheckpoints;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(numberRestoredCheckpoints, totalNumberCheckpoints, numberInProgressCheckpoints, numberCompletedCheckpoints, numberFailedCheckpoints);
- }
- }
-
- /**
- * Checkpoint summary.
- */
- public static final class Summary {
-
- public static final String FIELD_NAME_STATE_SIZE = "state_size";
-
- public static final String FIELD_NAME_DURATION = "end_to_end_duration";
-
- public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
-
- @JsonProperty(FIELD_NAME_STATE_SIZE)
- private final MinMaxAvgStatistics stateSize;
-
- @JsonProperty(FIELD_NAME_DURATION)
- private final MinMaxAvgStatistics duration;
-
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
- private final MinMaxAvgStatistics alignmentBuffered;
-
- @JsonCreator
- public Summary(
- @JsonProperty(FIELD_NAME_STATE_SIZE) MinMaxAvgStatistics stateSize,
- @JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration,
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) MinMaxAvgStatistics alignmentBuffered) {
- this.stateSize = stateSize;
- this.duration = duration;
- this.alignmentBuffered = alignmentBuffered;
- }
-
- public MinMaxAvgStatistics getStateSize() {
- return stateSize;
- }
-
- public MinMaxAvgStatistics getDuration() {
- return duration;
- }
-
- public MinMaxAvgStatistics getAlignmentBuffered() {
- return alignmentBuffered;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Summary summary = (Summary) o;
- return Objects.equals(stateSize, summary.stateSize) &&
- Objects.equals(duration, summary.duration) &&
- Objects.equals(alignmentBuffered, summary.alignmentBuffered);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(stateSize, duration, alignmentBuffered);
- }
- }
-
- /**
- * Minimum, maximum and average statistics.
- */
- public static final class MinMaxAvgStatistics {
-
- public static final String FIELD_NAME_MINIMUM = "min";
-
- public static final String FIELD_NAME_MAXIMUM = "max";
-
- public static final String FIELD_NAME_AVERAGE = "avg";
-
- @JsonProperty(FIELD_NAME_MINIMUM)
- private final long minimum;
-
- @JsonProperty(FIELD_NAME_MAXIMUM)
- private final long maximum;
-
- @JsonProperty(FIELD_NAME_AVERAGE)
- private final long average;
-
- @JsonCreator
- public MinMaxAvgStatistics(
- @JsonProperty(FIELD_NAME_MINIMUM) long minimum,
- @JsonProperty(FIELD_NAME_MAXIMUM) long maximum,
- @JsonProperty(FIELD_NAME_AVERAGE) long average) {
- this.minimum = minimum;
- this.maximum = maximum;
- this.average = average;
- }
-
- public long getMinimum() {
- return minimum;
- }
-
- public long getMaximum() {
- return maximum;
- }
-
- public long getAverage() {
- return average;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- MinMaxAvgStatistics that = (MinMaxAvgStatistics) o;
- return minimum == that.minimum &&
- maximum == that.maximum &&
- average == that.average;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(minimum, maximum, average);
- }
- }
-
- /**
- * Statistics about the latest checkpoints.
- */
- public static final class LatestCheckpoints {
-
- public static final String FIELD_NAME_COMPLETED = "completed";
-
- public static final String FIELD_NAME_SAVEPOINT = "savepoint";
-
- public static final String FIELD_NAME_FAILED = "failed";
-
- public static final String FIELD_NAME_RESTORED = "restored";
-
- @JsonProperty(FIELD_NAME_COMPLETED)
- @Nullable
- private final CompletedCheckpointStatistics completedCheckpointStatistics;
-
- @JsonProperty(FIELD_NAME_SAVEPOINT)
- @Nullable
- private final CompletedCheckpointStatistics savepointStatistics;
-
- @JsonProperty(FIELD_NAME_FAILED)
- @Nullable
- private final FailedCheckpointStatistics failedCheckpointStatistics;
-
- @JsonProperty(FIELD_NAME_RESTORED)
- @Nullable
- private final RestoredCheckpointStatistics restoredCheckpointStatistics;
-
- @JsonCreator
- public LatestCheckpoints(
- @JsonProperty(FIELD_NAME_COMPLETED) @Nullable CompletedCheckpointStatistics completedCheckpointStatistics,
- @JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CompletedCheckpointStatistics savepointStatistics,
- @JsonProperty(FIELD_NAME_FAILED) @Nullable FailedCheckpointStatistics failedCheckpointStatistics,
- @JsonProperty(FIELD_NAME_RESTORED) @Nullable RestoredCheckpointStatistics restoredCheckpointStatistics) {
- this.completedCheckpointStatistics = completedCheckpointStatistics;
- this.savepointStatistics = savepointStatistics;
- this.failedCheckpointStatistics = failedCheckpointStatistics;
- this.restoredCheckpointStatistics = restoredCheckpointStatistics;
- }
-
- @Nullable
- public CompletedCheckpointStatistics getCompletedCheckpointStatistics() {
- return completedCheckpointStatistics;
- }
-
- @Nullable
- public CompletedCheckpointStatistics getSavepointStatistics() {
- return savepointStatistics;
- }
-
- @Nullable
- public FailedCheckpointStatistics getFailedCheckpointStatistics() {
- return failedCheckpointStatistics;
- }
-
- @Nullable
- public RestoredCheckpointStatistics getRestoredCheckpointStatistics() {
- return restoredCheckpointStatistics;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- LatestCheckpoints that = (LatestCheckpoints) o;
- return Objects.equals(completedCheckpointStatistics, that.completedCheckpointStatistics) &&
- Objects.equals(savepointStatistics, that.savepointStatistics) &&
- Objects.equals(failedCheckpointStatistics, that.failedCheckpointStatistics) &&
- Objects.equals(restoredCheckpointStatistics, that.restoredCheckpointStatistics);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(completedCheckpointStatistics, savepointStatistics, failedCheckpointStatistics, restoredCheckpointStatistics);
- }
- }
-
- /**
- * Statistics for a checkpoint.
- */
- @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
- @JsonSubTypes({
- @JsonSubTypes.Type(value = CompletedCheckpointStatistics.class, name = "completed"),
- @JsonSubTypes.Type(value = FailedCheckpointStatistics.class, name = "failed")})
- public static class BaseCheckpointStatistics {
-
- public static final String FIELD_NAME_ID = "id";
-
- public static final String FIELD_NAME_STATUS = "status";
-
- public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
-
- public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
-
- public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
-
- public static final String FIELD_NAME_STATE_SIZE = "state_size";
-
- public static final String FIELD_NAME_DURATION = "end_to_end_duration";
-
- public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
-
- public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
-
- public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
-
- @JsonProperty(FIELD_NAME_ID)
- private final long id;
-
- @JsonProperty(FIELD_NAME_STATUS)
- private final CheckpointStatsStatus status;
-
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
- private final boolean savepoint;
-
- @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
- private final long triggerTimestamp;
-
- @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
- private final long latestAckTimestamp;
-
- @JsonProperty(FIELD_NAME_STATE_SIZE)
- private final long stateSize;
-
- @JsonProperty(FIELD_NAME_DURATION)
- private final long duration;
-
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
- private final long alignmentBuffered;
-
- @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
- private final int numSubtasks;
-
- @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
- private final int numAckSubtasks;
-
- @JsonCreator
- protected BaseCheckpointStatistics(
- @JsonProperty(FIELD_NAME_ID) long id,
- @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
- @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
- @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
- @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
- @JsonProperty(FIELD_NAME_DURATION) long duration,
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
- @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
- @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
- this.id = id;
- this.status = Preconditions.checkNotNull(status);
- this.savepoint = savepoint;
- this.triggerTimestamp = triggerTimestamp;
- this.latestAckTimestamp = latestAckTimestamp;
- this.stateSize = stateSize;
- this.duration = duration;
- this.alignmentBuffered = alignmentBuffered;
- this.numSubtasks = numSubtasks;
- this.numAckSubtasks = numAckSubtasks;
- }
-
- public long getId() {
- return id;
- }
-
- public CheckpointStatsStatus getStatus() {
- return status;
- }
-
- public boolean isSavepoint() {
- return savepoint;
- }
-
- public long getTriggerTimestamp() {
- return triggerTimestamp;
- }
-
- public long getLatestAckTimestamp() {
- return latestAckTimestamp;
- }
-
- public long getStateSize() {
- return stateSize;
- }
-
- public long getDuration() {
- return duration;
- }
-
- public long getAlignmentBuffered() {
- return alignmentBuffered;
- }
-
- public int getNumSubtasks() {
- return numSubtasks;
- }
-
- public int getNumAckSubtasks() {
- return numAckSubtasks;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- BaseCheckpointStatistics that = (BaseCheckpointStatistics) o;
- return id == that.id &&
- savepoint == that.savepoint &&
- triggerTimestamp == that.triggerTimestamp &&
- latestAckTimestamp == that.latestAckTimestamp &&
- stateSize == that.stateSize &&
- duration == that.duration &&
- alignmentBuffered == that.alignmentBuffered &&
- numSubtasks == that.numSubtasks &&
- numAckSubtasks == that.numAckSubtasks &&
- status == that.status;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
- }
- }
-
- /**
- * Statistics for a completed checkpoint.
- */
- public static final class CompletedCheckpointStatistics extends BaseCheckpointStatistics {
-
- public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
-
- public static final String FIELD_NAME_DISCARDED = "discarded";
-
- @JsonProperty(FIELD_NAME_EXTERNAL_PATH)
- @Nullable
- private final String externalPath;
-
- @JsonProperty(FIELD_NAME_DISCARDED)
- private final boolean discarded;
-
- @JsonCreator
- public CompletedCheckpointStatistics(
- @JsonProperty(FIELD_NAME_ID) long id,
- @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
- @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
- @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
- @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
- @JsonProperty(FIELD_NAME_DURATION) long duration,
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
- @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
- @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
- @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
- @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
- super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
-
- this.externalPath = externalPath;
- this.discarded = discarded;
- }
-
- @Nullable
- public String getExternalPath() {
- return externalPath;
- }
-
- public boolean isDiscarded() {
- return discarded;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o;
- return discarded == that.discarded &&
- Objects.equals(externalPath, that.externalPath);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), externalPath, discarded);
- }
- }
-
- /**
- * Statistics for a failed checkpoint.
- */
- public static final class FailedCheckpointStatistics extends BaseCheckpointStatistics {
-
- public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp";
-
- public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message";
-
- @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
- private final long failureTimestamp;
-
- @JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
- @Nullable
- private final String failureMessage;
-
- @JsonCreator
- public FailedCheckpointStatistics(
- @JsonProperty(FIELD_NAME_ID) long id,
- @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
- @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
- @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
- @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
- @JsonProperty(FIELD_NAME_DURATION) long duration,
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
- @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
- @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
- @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
- @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
- super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
-
- this.failureTimestamp = failureTimestamp;
- this.failureMessage = failureMessage;
- }
-
- public long getFailureTimestamp() {
- return failureTimestamp;
- }
-
- @Nullable
- public String getFailureMessage() {
- return failureMessage;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- FailedCheckpointStatistics that = (FailedCheckpointStatistics) o;
- return failureTimestamp == that.failureTimestamp &&
- Objects.equals(failureMessage, that.failureMessage);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), failureTimestamp, failureMessage);
- }
- }
-
- /**
- * Statistics for a restored checkpoint.
- */
- public static final class RestoredCheckpointStatistics {
-
- public static final String FIELD_NAME_ID = "id";
-
- public static final String FIELD_NAME_RESTORE_TIMESTAMP = "restore_timestamp";
-
- public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
-
- public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
-
- @JsonProperty(FIELD_NAME_ID)
- private final long id;
-
- @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP)
- private final long restoreTimestamp;
-
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
- private final boolean savepoint;
-
- @JsonProperty(FIELD_NAME_EXTERNAL_PATH)
- @Nullable
- private final String externalPath;
-
- @JsonCreator
- public RestoredCheckpointStatistics(
- @JsonProperty(FIELD_NAME_ID) long id,
- @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) long restoreTimestamp,
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
- @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath) {
- this.id = id;
- this.restoreTimestamp = restoreTimestamp;
- this.savepoint = savepoint;
- this.externalPath = externalPath;
- }
-
- public long getId() {
- return id;
- }
-
- public long getRestoreTimestamp() {
- return restoreTimestamp;
- }
-
- public boolean isSavepoint() {
- return savepoint;
- }
-
- @Nullable
- public String getExternalPath() {
- return externalPath;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- RestoredCheckpointStatistics that = (RestoredCheckpointStatistics) o;
- return id == that.id &&
- restoreTimestamp == that.restoreTimestamp &&
- savepoint == that.savepoint &&
- Objects.equals(externalPath, that.externalPath);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, restoreTimestamp, savepoint, externalPath);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
deleted file mode 100644
index b062d0d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * Message headers for the {@link CheckpointStatisticsHandler}.
- */
-public class CheckpointStatisticsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> {
-
- private static final CheckpointStatisticsHeaders INSTANCE = new CheckpointStatisticsHeaders();
-
- public static final String URL = "/jobs/:jobid/checkpoints";
-
- @Override
- public Class<EmptyRequestBody> getRequestClass() {
- return EmptyRequestBody.class;
- }
-
- @Override
- public Class<CheckpointStatistics> getResponseClass() {
- return CheckpointStatistics.class;
- }
-
- @Override
- public HttpResponseStatus getResponseStatusCode() {
- return HttpResponseStatus.OK;
- }
-
- @Override
- public JobMessageParameters getUnresolvedMessageParameters() {
- return new JobMessageParameters();
- }
-
- @Override
- public HttpMethodWrapper getHttpMethod() {
- return HttpMethodWrapper.GET;
- }
-
- @Override
- public String getTargetRestEndpointURL() {
- return URL;
- }
-
- public static CheckpointStatisticsHeaders getInstance() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
index 9d74c95..1155892 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
@@ -26,7 +26,7 @@ import java.util.Collections;
*/
public class JobMessageParameters extends MessageParameters {
- private final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
+ protected final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
@Override
public Collection<MessagePathParameter<?>> getPathParameters() {
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
new file mode 100644
index 0000000..f0526a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigHeaders implements MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
+
+ private static final CheckpointConfigHeaders INSTANCE = new CheckpointConfigHeaders();
+
+ public static final String URL = "/jobs/:jobid/checkpoints/config";
+
+ private CheckpointConfigHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<CheckpointConfigInfo> getResponseClass() {
+ return CheckpointConfigInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static CheckpointConfigHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
new file mode 100644
index 0000000..797d3a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Response class of the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_PROCESSING_MODE = "mode";
+
+ public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
+
+ public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
+
+ public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause";
+
+ public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent";
+
+ public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization";
+
+ @JsonProperty(FIELD_NAME_PROCESSING_MODE)
+ private final ProcessingMode processingMode;
+
+ @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL)
+ private final long checkpointInterval;
+
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT)
+ private final long checkpointTimeout;
+
+ @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE)
+ private final long minPauseBetweenCheckpoints;
+
+ @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT)
+ private final long maxConcurrentCheckpoints;
+
+ @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG)
+ private final ExternalizedCheckpointInfo externalizedCheckpointInfo;
+
+ @JsonCreator
+ public CheckpointConfigInfo(
+ @JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long checkpointInterval,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long checkpointTimeout,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long minPauseBetweenCheckpoints,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int maxConcurrentCheckpoints,
+ @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo) {
+ this.processingMode = Preconditions.checkNotNull(processingMode);
+ this.checkpointInterval = checkpointInterval;
+ this.checkpointTimeout = checkpointTimeout;
+ this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+ this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+ this.externalizedCheckpointInfo = Preconditions.checkNotNull(externalizedCheckpointInfo);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CheckpointConfigInfo that = (CheckpointConfigInfo) o;
+ return checkpointInterval == that.checkpointInterval &&
+ checkpointTimeout == that.checkpointTimeout &&
+ minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
+ maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
+ processingMode == that.processingMode &&
+ Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointInfo);
+ }
+
+ /**
+ * Contains information about the externalized checkpoint configuration.
+ */
+ public static final class ExternalizedCheckpointInfo {
+
+ public static final String FIELD_NAME_ENABLED = "enabled";
+
+ public static final String FIELD_NAME_DELETE_ON_CANCELLATION = "delete_on_cancellation";
+
+ @JsonProperty(FIELD_NAME_ENABLED)
+ private final boolean enabled;
+
+ @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION)
+ private final boolean deleteOnCancellation;
+
+ @JsonCreator
+ public ExternalizedCheckpointInfo(
+ @JsonProperty(FIELD_NAME_ENABLED) boolean enabled,
+ @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) {
+ this.enabled = enabled;
+ this.deleteOnCancellation = deleteOnCancellation;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExternalizedCheckpointInfo that = (ExternalizedCheckpointInfo) o;
+ return enabled == that.enabled &&
+ deleteOnCancellation == that.deleteOnCancellation;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(enabled, deleteOnCancellation);
+ }
+ }
+
+ /**
+ * Processing mode.
+ */
+ public enum ProcessingMode {
+ AT_LEAST_ONCE,
+ EXACTLY_ONCE
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
new file mode 100644
index 0000000..c08cc82
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+/**
+ * Path parameter for the checkpoint id of type {@link Long}.
+ */
+public class CheckpointIdPathParameter extends MessagePathParameter<Long> {
+
+ public static final String KEY = "checkpointid";
+
+ protected CheckpointIdPathParameter() {
+ super(KEY);
+ }
+
+ @Override
+ protected Long convertFromString(String value) throws ConversionException {
+ try {
+ return Long.parseLong(value);
+ } catch (NumberFormatException nfe) {
+ throw new ConversionException("Could not parse long from " + value + '.', nfe);
+ }
+ }
+
+ @Override
+ protected String convertToString(Long value) {
+ return value.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
new file mode 100644
index 0000000..040aa87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Message parameters for checkpoint related messages.
+ */
+public class CheckpointMessageParameters extends JobMessageParameters {
+
+ protected final CheckpointIdPathParameter checkpointIdPathParameter = new CheckpointIdPathParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Arrays.asList(jobPathParameter, checkpointIdPathParameter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
new file mode 100644
index 0000000..3d7ba2b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Headers for the {@link CheckpointStatisticDetailsHandler}.
+ */
+public class CheckpointStatisticDetailsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> {
+
+ private static final CheckpointStatisticDetailsHeaders INSTANCE = new CheckpointStatisticDetailsHeaders();
+
+ public static final String URL = "/jobs/:jobid/checkpoints/:checkpointid";
+
+ private CheckpointStatisticDetailsHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<CheckpointStatistics> getResponseClass() {
+ return CheckpointStatistics.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public CheckpointMessageParameters getUnresolvedMessageParameters() {
+ return new CheckpointMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static CheckpointStatisticDetailsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
new file mode 100644
index 0000000..9fb1094
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+ @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+public class CheckpointStatistics implements ResponseBody {
+
+ public static final String FIELD_NAME_ID = "id";
+
+ public static final String FIELD_NAME_STATUS = "status";
+
+ public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+ public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
+
+ public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
+
+ public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+ public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+ public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
+
+ public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+ public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
+
+ public static final String FIELD_NAME_TASKS = "tasks";
+
+ @JsonProperty(FIELD_NAME_ID)
+ private final long id;
+
+ @JsonProperty(FIELD_NAME_STATUS)
+ private final CheckpointStatsStatus status;
+
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+ private final boolean savepoint;
+
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+ private final long triggerTimestamp;
+
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+ private final long latestAckTimestamp;
+
+ @JsonProperty(FIELD_NAME_STATE_SIZE)
+ private final long stateSize;
+
+ @JsonProperty(FIELD_NAME_DURATION)
+ private final long duration;
+
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+ private final long alignmentBuffered;
+
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+ private final int numSubtasks;
+
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+ private final int numAckSubtasks;
+
+ @JsonProperty(FIELD_NAME_TASKS)
+ @JsonSerialize(keyUsing = JobVertexIDSerializer.class)
+ private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
+
+ @JsonCreator
+ private CheckpointStatistics(
+ @JsonProperty(FIELD_NAME_ID) long id,
+ @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+ @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+ @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+ this.id = id;
+ this.status = Preconditions.checkNotNull(status);
+ this.savepoint = savepoint;
+ this.triggerTimestamp = triggerTimestamp;
+ this.latestAckTimestamp = latestAckTimestamp;
+ this.stateSize = stateSize;
+ this.duration = duration;
+ this.alignmentBuffered = alignmentBuffered;
+ this.numSubtasks = numSubtasks;
+ this.numAckSubtasks = numAckSubtasks;
+ this.checkpointStatisticsPerTask = Preconditions.checkNotNull(checkpointStatisticsPerTask);
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public CheckpointStatsStatus getStatus() {
+ return status;
+ }
+
+ public boolean isSavepoint() {
+ return savepoint;
+ }
+
+ public long getTriggerTimestamp() {
+ return triggerTimestamp;
+ }
+
+ public long getLatestAckTimestamp() {
+ return latestAckTimestamp;
+ }
+
+ public long getStateSize() {
+ return stateSize;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public long getAlignmentBuffered() {
+ return alignmentBuffered;
+ }
+
+ public int getNumSubtasks() {
+ return numSubtasks;
+ }
+
+ public int getNumAckSubtasks() {
+ return numAckSubtasks;
+ }
+
+ @Nullable
+ public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() {
+ return checkpointStatisticsPerTask;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CheckpointStatistics that = (CheckpointStatistics) o;
+ return id == that.id &&
+ savepoint == that.savepoint &&
+ triggerTimestamp == that.triggerTimestamp &&
+ latestAckTimestamp == that.latestAckTimestamp &&
+ stateSize == that.stateSize &&
+ duration == that.duration &&
+ alignmentBuffered == that.alignmentBuffered &&
+ numSubtasks == that.numSubtasks &&
+ numAckSubtasks == that.numAckSubtasks &&
+ status == that.status &&
+ Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask);
+ }
+
+ // -------------------------------------------------------------------------
+ // Static factory methods
+ // -------------------------------------------------------------------------
+
+ public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) {
+ Preconditions.checkNotNull(checkpointStats);
+
+ Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
+
+ if (includeTaskCheckpointStatistics) {
+ Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats();
+
+ checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size());
+
+ for (TaskStateStats taskStateStat : taskStateStats) {
+ checkpointStatisticsPerTask.put(
+ taskStateStat.getJobVertexId(),
+ new TaskCheckpointStatistics(
+ taskStateStat.getLatestAckTimestamp(),
+ taskStateStat.getStateSize(),
+ taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
+ taskStateStat.getAlignmentBuffered(),
+ taskStateStat.getNumberOfSubtasks(),
+ taskStateStat.getNumberOfAcknowledgedSubtasks()));
+ }
+ } else {
+ checkpointStatisticsPerTask = Collections.emptyMap();
+ }
+
+ if (checkpointStats instanceof CompletedCheckpointStats) {
+ final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
+
+ return new CheckpointStatistics.CompletedCheckpointStatistics(
+ completedCheckpointStats.getCheckpointId(),
+ completedCheckpointStats.getStatus(),
+ completedCheckpointStats.getProperties().isSavepoint(),
+ completedCheckpointStats.getTriggerTimestamp(),
+ completedCheckpointStats.getLatestAckTimestamp(),
+ completedCheckpointStats.getStateSize(),
+ completedCheckpointStats.getEndToEndDuration(),
+ completedCheckpointStats.getAlignmentBuffered(),
+ completedCheckpointStats.getNumberOfSubtasks(),
+ completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+ checkpointStatisticsPerTask,
+ completedCheckpointStats.getExternalPath(),
+ completedCheckpointStats.isDiscarded());
+ } else if (checkpointStats instanceof FailedCheckpointStats) {
+ final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
+
+ return new CheckpointStatistics.FailedCheckpointStatistics(
+ failedCheckpointStats.getCheckpointId(),
+ failedCheckpointStats.getStatus(),
+ failedCheckpointStats.getProperties().isSavepoint(),
+ failedCheckpointStats.getTriggerTimestamp(),
+ failedCheckpointStats.getLatestAckTimestamp(),
+ failedCheckpointStats.getStateSize(),
+ failedCheckpointStats.getEndToEndDuration(),
+ failedCheckpointStats.getAlignmentBuffered(),
+ failedCheckpointStats.getNumberOfSubtasks(),
+ failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+ checkpointStatisticsPerTask,
+ failedCheckpointStats.getFailureTimestamp(),
+ failedCheckpointStats.getFailureMessage());
+ } else {
+ throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // Static inner classes
+ // ---------------------------------------------------------------------
+
+ /**
+ * Checkpoint statistics for a single task.
+ */
+ public static final class TaskCheckpointStatistics {
+
+ public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
+
+ public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+ public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+ public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
+
+ public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+ public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
+
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+ private final long latestAckTimestamp;
+
+ @JsonProperty(FIELD_NAME_STATE_SIZE)
+ private final long stateSize;
+
+ @JsonProperty(FIELD_NAME_DURATION)
+ private final long duration;
+
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+ private final long alignmentBuffered;
+
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+ private final int numSubtasks;
+
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+ private final int numAckSubtasks;
+
+ @JsonCreator
+ public TaskCheckpointStatistics(
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+ @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
+ this.latestAckTimestamp = latestAckTimestamp;
+ this.stateSize = stateSize;
+ this.duration = duration;
+ this.alignmentBuffered = alignmentBuffered;
+ this.numSubtasks = numSubtasks;
+ this.numAckSubtasks = numAckSubtasks;
+ }
+
+ public long getLatestAckTimestamp() {
+ return latestAckTimestamp;
+ }
+
+ public long getStateSize() {
+ return stateSize;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public long getAlignmentBuffered() {
+ return alignmentBuffered;
+ }
+
+ public int getNumSubtasks() {
+ return numSubtasks;
+ }
+
+ public int getNumAckSubtasks() {
+ return numAckSubtasks;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskCheckpointStatistics that = (TaskCheckpointStatistics) o;
+ return latestAckTimestamp == that.latestAckTimestamp &&
+ stateSize == that.stateSize &&
+ duration == that.duration &&
+ alignmentBuffered == that.alignmentBuffered &&
+ numSubtasks == that.numSubtasks &&
+ numAckSubtasks == that.numAckSubtasks;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
+ }
+ }
+
+ /**
+ * Statistics for a completed checkpoint.
+ */
+ public static final class CompletedCheckpointStatistics extends CheckpointStatistics {
+
+ public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
+
+ public static final String FIELD_NAME_DISCARDED = "discarded";
+
+ @JsonProperty(FIELD_NAME_EXTERNAL_PATH)
+ @Nullable
+ private final String externalPath;
+
+ @JsonProperty(FIELD_NAME_DISCARDED)
+ private final boolean discarded;
+
+ @JsonCreator
+ public CompletedCheckpointStatistics(
+ @JsonProperty(FIELD_NAME_ID) long id,
+ @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+ @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+ @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+ @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
+ @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
+ super(
+ id,
+ status,
+ savepoint,
+ triggerTimestamp,
+ latestAckTimestamp,
+ stateSize,
+ duration,
+ alignmentBuffered,
+ numSubtasks,
+ numAckSubtasks,
+ checkpointingStatisticsPerTask);
+
+ this.externalPath = externalPath;
+ this.discarded = discarded;
+ }
+
+ @Nullable
+ public String getExternalPath() {
+ return externalPath;
+ }
+
+ public boolean isDiscarded() {
+ return discarded;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o;
+ return discarded == that.discarded &&
+ Objects.equals(externalPath, that.externalPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), externalPath, discarded);
+ }
+ }
+
+ /**
+ * Statistics for a failed checkpoint.
+ */
+ public static final class FailedCheckpointStatistics extends CheckpointStatistics {
+
+ public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp";
+
+ public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message";
+
+ @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
+ private final long failureTimestamp;
+
+ @JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
+ @Nullable
+ private final String failureMessage;
+
+ @JsonCreator
+ public FailedCheckpointStatistics(
+ @JsonProperty(FIELD_NAME_ID) long id,
+ @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+ @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+ @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+ @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
+ @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
+ super(
+ id,
+ status,
+ savepoint,
+ triggerTimestamp,
+ latestAckTimestamp,
+ stateSize,
+ duration,
+ alignmentBuffered,
+ numSubtasks,
+ numAckSubtasks,
+ checkpointingStatisticsPerTask);
+
+ this.failureTimestamp = failureTimestamp;
+ this.failureMessage = failureMessage;
+ }
+
+ public long getFailureTimestamp() {
+ return failureTimestamp;
+ }
+
+ @Nullable
+ public String getFailureMessage() {
+ return failureMessage;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ FailedCheckpointStatistics that = (FailedCheckpointStatistics) o;
+ return failureTimestamp == that.failureTimestamp &&
+ Objects.equals(failureMessage, that.failureMessage);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), failureTimestamp, failureMessage);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
new file mode 100644
index 0000000..1f00fcc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response of the {@link CheckpointingStatisticsHandler}. This class contains information about
+ * the checkpointing of a given job.
+ */
+public class CheckpointingStatistics implements ResponseBody {
+
+ public static final String FIELD_NAME_COUNTS = "counts";
+
+ public static final String FIELD_NAME_SUMMARY = "summary";
+
+ public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest";
+
+ public static final String FIELD_NAME_HISTORY = "history";
+
+ @JsonProperty(FIELD_NAME_COUNTS)
+ private final Counts counts;
+
+ @JsonProperty(FIELD_NAME_SUMMARY)
+ private final Summary summary;
+
+ @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
+ private final LatestCheckpoints latestCheckpoints;
+
+ @JsonProperty(FIELD_NAME_HISTORY)
+ private final List<CheckpointStatistics> history;
+
+ @JsonCreator
+ public CheckpointingStatistics(
+ @JsonProperty(FIELD_NAME_COUNTS) Counts counts,
+ @JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
+ @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints,
+ @JsonProperty(FIELD_NAME_HISTORY) List<CheckpointStatistics> history) {
+ this.counts = Preconditions.checkNotNull(counts);
+ this.summary = Preconditions.checkNotNull(summary);
+ this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints);
+ this.history = Preconditions.checkNotNull(history);
+ }
+
+ public Counts getCounts() {
+ return counts;
+ }
+
+ public Summary getSummary() {
+ return summary;
+ }
+
+ public LatestCheckpoints getLatestCheckpoints() {
+ return latestCheckpoints;
+ }
+
+ public List<CheckpointStatistics> getHistory() {
+ return history;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CheckpointingStatistics that = (CheckpointingStatistics) o;
+ return Objects.equals(counts, that.counts) &&
+ Objects.equals(summary, that.summary) &&
+ Objects.equals(latestCheckpoints, that.latestCheckpoints) &&
+ Objects.equals(history, that.history);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(counts, summary, latestCheckpoints, history);
+ }
+
+ // ------------------------------------------------------------------
+ // Inner classes
+ // ------------------------------------------------------------------
+
+ /**
+ * Checkpoint counts.
+ */
+ public static final class Counts {
+
+ public static final String FIELD_NAME_RESTORED_CHECKPOINTS = "restored";
+
+ public static final String FIELD_NAME_TOTAL_CHECKPOINTS = "total";
+
+ public static final String FIELD_NAME_IN_PROGRESS_CHECKPOINTS = "in_progress";
+
+ public static final String FIELD_NAME_COMPLETED_CHECKPOINTS = "completed";
+
+ public static final String FIELD_NAME_FAILED_CHECKPOINTS = "failed";
+
+ @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS)
+ private final long numberRestoredCheckpoints;
+
+ @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS)
+ private final long totalNumberCheckpoints;
+
+ @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS)
+ private final int numberInProgressCheckpoints;
+
+ @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS)
+ private final long numberCompletedCheckpoints;
+
+ @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS)
+ private final long numberFailedCheckpoints;
+
+ @JsonCreator
+ public Counts(
+ @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) long numberRestoredCheckpoints,
+ @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) long totalNumberCheckpoints,
+ @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) int numberInProgressCheckpoints,
+ @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) long numberCompletedCheckpoints,
+ @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) long numberFailedCheckpoints) {
+ this.numberRestoredCheckpoints = numberRestoredCheckpoints;
+ this.totalNumberCheckpoints = totalNumberCheckpoints;
+ this.numberInProgressCheckpoints = numberInProgressCheckpoints;
+ this.numberCompletedCheckpoints = numberCompletedCheckpoints;
+ this.numberFailedCheckpoints = numberFailedCheckpoints;
+ }
+
+ public long getNumberRestoredCheckpoints() {
+ return numberRestoredCheckpoints;
+ }
+
+ public long getTotalNumberCheckpoints() {
+ return totalNumberCheckpoints;
+ }
+
+ public int getNumberInProgressCheckpoints() {
+ return numberInProgressCheckpoints;
+ }
+
+ public long getNumberCompletedCheckpoints() {
+ return numberCompletedCheckpoints;
+ }
+
+ public long getNumberFailedCheckpoints() {
+ return numberFailedCheckpoints;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Counts counts = (Counts) o;
+ return numberRestoredCheckpoints == counts.numberRestoredCheckpoints &&
+ totalNumberCheckpoints == counts.totalNumberCheckpoints &&
+ numberInProgressCheckpoints == counts.numberInProgressCheckpoints &&
+ numberCompletedCheckpoints == counts.numberCompletedCheckpoints &&
+ numberFailedCheckpoints == counts.numberFailedCheckpoints;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(numberRestoredCheckpoints, totalNumberCheckpoints, numberInProgressCheckpoints, numberCompletedCheckpoints, numberFailedCheckpoints);
+ }
+ }
+
+ /**
+ * Checkpoint summary.
+ */
+ public static final class Summary {
+
+ public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+ public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+ public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
+
+ @JsonProperty(FIELD_NAME_STATE_SIZE)
+ private final MinMaxAvgStatistics stateSize;
+
+ @JsonProperty(FIELD_NAME_DURATION)
+ private final MinMaxAvgStatistics duration;
+
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+ private final MinMaxAvgStatistics alignmentBuffered;
+
+ @JsonCreator
+ public Summary(
+ @JsonProperty(FIELD_NAME_STATE_SIZE) MinMaxAvgStatistics stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) MinMaxAvgStatistics alignmentBuffered) {
+ this.stateSize = stateSize;
+ this.duration = duration;
+ this.alignmentBuffered = alignmentBuffered;
+ }
+
+ public MinMaxAvgStatistics getStateSize() {
+ return stateSize;
+ }
+
+ public MinMaxAvgStatistics getDuration() {
+ return duration;
+ }
+
+ public MinMaxAvgStatistics getAlignmentBuffered() {
+ return alignmentBuffered;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Summary summary = (Summary) o;
+ return Objects.equals(stateSize, summary.stateSize) &&
+ Objects.equals(duration, summary.duration) &&
+ Objects.equals(alignmentBuffered, summary.alignmentBuffered);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(stateSize, duration, alignmentBuffered);
+ }
+ }
+
+ /**
+ * Minimum, maximum and average statistics.
+ */
+ public static final class MinMaxAvgStatistics {
+
+ public static final String FIELD_NAME_MINIMUM = "min";
+
+ public static final String FIELD_NAME_MAXIMUM = "max";
+
+ public static final String FIELD_NAME_AVERAGE = "avg";
+
+ @JsonProperty(FIELD_NAME_MINIMUM)
+ private final long minimum;
+
+ @JsonProperty(FIELD_NAME_MAXIMUM)
+ private final long maximum;
+
+ @JsonProperty(FIELD_NAME_AVERAGE)
+ private final long average;
+
+ @JsonCreator
+ public MinMaxAvgStatistics(
+ @JsonProperty(FIELD_NAME_MINIMUM) long minimum,
+ @JsonProperty(FIELD_NAME_MAXIMUM) long maximum,
+ @JsonProperty(FIELD_NAME_AVERAGE) long average) {
+ this.minimum = minimum;
+ this.maximum = maximum;
+ this.average = average;
+ }
+
+ public long getMinimum() {
+ return minimum;
+ }
+
+ public long getMaximum() {
+ return maximum;
+ }
+
+ public long getAverage() {
+ return average;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MinMaxAvgStatistics that = (MinMaxAvgStatistics) o;
+ return minimum == that.minimum &&
+ maximum == that.maximum &&
+ average == that.average;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(minimum, maximum, average);
+ }
+ }
+
+ /**
+ * Statistics about the latest checkpoints.
+ */
+ public static final class LatestCheckpoints {
+
+ public static final String FIELD_NAME_COMPLETED = "completed";
+
+ public static final String FIELD_NAME_SAVEPOINT = "savepoint";
+
+ public static final String FIELD_NAME_FAILED = "failed";
+
+ public static final String FIELD_NAME_RESTORED = "restored";
+
+ @JsonProperty(FIELD_NAME_COMPLETED)
+ @Nullable
+ private final CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics;
+
+ @JsonProperty(FIELD_NAME_SAVEPOINT)
+ @Nullable
+ private final CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics;
+
+ @JsonProperty(FIELD_NAME_FAILED)
+ @Nullable
+ private final CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics;
+
+ @JsonProperty(FIELD_NAME_RESTORED)
+ @Nullable
+ private final RestoredCheckpointStatistics restoredCheckpointStatistics;
+
+ @JsonCreator
+ public LatestCheckpoints(
+ @JsonProperty(FIELD_NAME_COMPLETED) @Nullable CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics,
+ @JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics,
+ @JsonProperty(FIELD_NAME_FAILED) @Nullable CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics,
+ @JsonProperty(FIELD_NAME_RESTORED) @Nullable RestoredCheckpointStatistics restoredCheckpointStatistics) {
+ this.completedCheckpointStatistics = completedCheckpointStatistics;
+ this.savepointStatistics = savepointStatistics;
+ this.failedCheckpointStatistics = failedCheckpointStatistics;
+ this.restoredCheckpointStatistics = restoredCheckpointStatistics;
+ }
+
+ @Nullable
+ public CheckpointStatistics.CompletedCheckpointStatistics getCompletedCheckpointStatistics() {
+ return completedCheckpointStatistics;
+ }
+
+ @Nullable
+ public CheckpointStatistics.CompletedCheckpointStatistics getSavepointStatistics() {
+ return savepointStatistics;
+ }
+
+ @Nullable
+ public CheckpointStatistics.FailedCheckpointStatistics getFailedCheckpointStatistics() {
+ return failedCheckpointStatistics;
+ }
+
+ @Nullable
+ public RestoredCheckpointStatistics getRestoredCheckpointStatistics() {
+ return restoredCheckpointStatistics;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LatestCheckpoints that = (LatestCheckpoints) o;
+ return Objects.equals(completedCheckpointStatistics, that.completedCheckpointStatistics) &&
+ Objects.equals(savepointStatistics, that.savepointStatistics) &&
+ Objects.equals(failedCheckpointStatistics, that.failedCheckpointStatistics) &&
+ Objects.equals(restoredCheckpointStatistics, that.restoredCheckpointStatistics);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(completedCheckpointStatistics, savepointStatistics, failedCheckpointStatistics, restoredCheckpointStatistics);
+ }
+ }
+
+ /**
+ * Statistics for a restored checkpoint.
+ */
+ public static final class RestoredCheckpointStatistics {
+
+ public static final String FIELD_NAME_ID = "id";
+
+ public static final String FIELD_NAME_RESTORE_TIMESTAMP = "restore_timestamp";
+
+ public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+ public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
+
+ @JsonProperty(FIELD_NAME_ID)
+ private final long id;
+
+ @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP)
+ private final long restoreTimestamp;
+
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+ private final boolean savepoint;
+
+ @JsonProperty(FIELD_NAME_EXTERNAL_PATH)
+ @Nullable
+ private final String externalPath;
+
+ @JsonCreator
+ public RestoredCheckpointStatistics(
+ @JsonProperty(FIELD_NAME_ID) long id,
+ @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) long restoreTimestamp,
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+ @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath) {
+ this.id = id;
+ this.restoreTimestamp = restoreTimestamp;
+ this.savepoint = savepoint;
+ this.externalPath = externalPath;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public long getRestoreTimestamp() {
+ return restoreTimestamp;
+ }
+
+ public boolean isSavepoint() {
+ return savepoint;
+ }
+
+ @Nullable
+ public String getExternalPath() {
+ return externalPath;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RestoredCheckpointStatistics that = (RestoredCheckpointStatistics) o;
+ return id == that.id &&
+ restoreTimestamp == that.restoreTimestamp &&
+ savepoint == that.savepoint &&
+ Objects.equals(externalPath, that.externalPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, restoreTimestamp, savepoint, externalPath);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
new file mode 100644
index 0000000..ce809e7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link CheckpointingStatisticsHandler}.
+ */
+public class CheckpointingStatisticsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> {
+
+ private static final CheckpointingStatisticsHeaders INSTANCE = new CheckpointingStatisticsHeaders();
+
+ public static final String URL = "/jobs/:jobid/checkpoints";
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<CheckpointingStatistics> getResponseClass() {
+ return CheckpointingStatistics.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static CheckpointingStatisticsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
new file mode 100644
index 0000000..b1f083f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson deserializer for {@link JobVertexID}.
+ */
+public class JobVertexIDDeserializer extends KeyDeserializer {
+
+ @Override
+ public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException {
+ return JobVertexID.fromHexString(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
new file mode 100644
index 0000000..f2b9859
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson serializer for {@link JobVertexID}.
+ */
+public class JobVertexIDSerializer extends StdSerializer<JobVertexID> {
+
+ private static final long serialVersionUID = 2970050507628933522L;
+
+ public JobVertexIDSerializer() {
+ super(JobVertexID.class);
+ }
+
+ @Override
+ public void serialize(JobVertexID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+ gen.writeFieldName(value.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
index 04b1c55..7337772 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 1eac20b..263117a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;