You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/10 08:49:42 UTC
[03/11] flink git commit: [FLINK-4410] [runtime-web] Add detailed
checkpoint stats handlers
[FLINK-4410] [runtime-web] Add detailed checkpoint stats handlers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dec0d6bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dec0d6bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dec0d6bb
Branch: refs/heads/master
Commit: dec0d6bb702cfcbf30e60f6011565fd9455bb121
Parents: 579bc96
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Dec 23 20:44:12 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Jan 10 09:48:52 2017 +0100
----------------------------------------------------------------------
.../runtime/webmonitor/WebRuntimeMonitor.java | 21 +-
.../AbstractJobVertexRequestHandler.java | 34 +-
.../checkpoints/CheckpointConfigHandler.java | 77 +++++
.../checkpoints/CheckpointStatsCache.java | 80 +++++
.../CheckpointStatsDetailsHandler.java | 153 +++++++++
.../CheckpointStatsDetailsSubtasksHandler.java | 189 ++++++++++
.../checkpoints/CheckpointStatsHandler.java | 235 +++++++++++++
.../CheckpointConfigHandlerTest.java | 146 ++++++++
.../checkpoints/CheckpointStatsCacheTest.java | 67 ++++
.../CheckpointStatsDetailsHandlerTest.java | 286 ++++++++++++++++
.../checkpoints/CheckpointStatsHandlerTest.java | 303 ++++++++++++++++
...heckpointStatsSubtaskDetailsHandlerTest.java | 342 +++++++++++++++++++
12 files changed, 1916 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index c5b7d35..3080b57 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -53,7 +53,6 @@ import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationWithSavepointHandlers;
-import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
@@ -62,7 +61,6 @@ import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexTaskManagersHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
@@ -73,6 +71,11 @@ import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandl
import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsCache;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
@@ -273,7 +276,6 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher)))
.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
- .GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(
currentGraphs,
backPressureStatsTracker,
@@ -288,7 +290,6 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
- .GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs)))
.GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher)))
.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
@@ -328,6 +329,18 @@ public class WebRuntimeMonitor implements WebMonitor {
// DELETE is the preferred way of stopping a job (Rest-conform)
.DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler()));
+ int maxCachedEntries = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+ CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
+
+ // Register the checkpoint stats handlers
+ router
+ .GET("/jobs/:jobid/checkpoints", handler(new CheckpointStatsHandler(currentGraphs)))
+ .GET("/jobs/:jobid/checkpoints/config", handler(new CheckpointConfigHandler(currentGraphs)))
+ .GET("/jobs/:jobid/checkpoints/details/:checkpointid", handler(new CheckpointStatsDetailsHandler(currentGraphs, cache)))
+ .GET("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", handler(new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache)));
+
if (webSubmitAllow) {
router
// fetch the list of uploaded jars.
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
index a36f94a..38243e5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -37,26 +37,34 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
@Override
public final String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- final String vidString = params.get("vertexid");
- if (vidString == null) {
- throw new IllegalArgumentException("vertexId parameter missing");
- }
-
- final JobVertexID vid;
- try {
- vid = JobVertexID.fromHexString(vidString);
- }
- catch (Exception e) {
- throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
- }
+ final JobVertexID vid = parseJobVertexId(params);
final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
if (jobVertex == null) {
- throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
+ throw new IllegalArgumentException("No vertex with ID '" + vid + "' exists.");
}
return handleRequest(jobVertex, params);
}
+
+ /**
+ * Returns the job vertex ID parsed from the provided parameters.
+ *
+ * @param params Path parameters
+ * @return Parsed job vertex ID or <code>null</code> if not available.
+ */
+ public static JobVertexID parseJobVertexId(Map<String, String> params) {
+ String jobVertexIdParam = params.get("vertexid");
+ if (jobVertexIdParam == null) {
+ return null;
+ }
+
+ try {
+ return JobVertexID.fromHexString(jobVertexIdParam);
+ } catch (RuntimeException ignored) {
+ return null;
+ }
+ }
public abstract String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
new file mode 100644
index 0000000..1ad5e65
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Handler that returns a job's snapshotting settings.
+ */
+public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler {
+
+ public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
+ CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
+ JobSnapshottingSettings settings = tracker.getSnapshottingSettings();
+
+ gen.writeStartObject();
+ {
+ gen.writeStringField("mode", settings.isExactlyOnce() ? "exactly_once" : "at_least_once");
+ gen.writeNumberField("interval", settings.getCheckpointInterval());
+ gen.writeNumberField("timeout", settings.getCheckpointTimeout());
+ gen.writeNumberField("min_pause", settings.getMinPauseBetweenCheckpoints());
+ gen.writeNumberField("max_concurrent", settings.getMaxConcurrentCheckpoints());
+
+ ExternalizedCheckpointSettings externalization = settings.getExternalizedCheckpointSettings();
+ gen.writeObjectFieldStart("externalization");
+ {
+ if (externalization.externalizeCheckpoints()) {
+ gen.writeBooleanField("enabled", true);
+ gen.writeBooleanField("delete_on_cancellation", externalization.deleteOnCancellation());
+ } else {
+ gen.writeBooleanField("enabled", false);
+ }
+ }
+ gen.writeEndObject();
+
+ }
+ gen.writeEndObject();
+
+ gen.close();
+
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
new file mode 100644
index 0000000..35d529a
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
+import javax.annotation.Nullable;
+
+/**
+ * A size-based cache of accessed checkpoints for completed and failed
+ * checkpoints.
+ *
+ * <p>Having this cache in place for accessed stats improves the user
+ * experience quite a bit as accessed checkpoint stats stay available
+ * and don't expire. For example if you manage to click on the last
+ * checkpoint in the history, it is not available via the stats as soon
+ * as another checkpoint is triggered. With the cache in place, the
+ * checkpoint will still be available for investigation.
+ */
+public class CheckpointStatsCache {
+
+ @Nullable
+ private final Cache<Long, AbstractCheckpointStats> cache;
+
+ public CheckpointStatsCache(int maxNumEntries) {
+ if (maxNumEntries > 0) {
+ this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder()
+ .maximumSize(maxNumEntries)
+ .build();
+ } else {
+ this.cache = null;
+ }
+ }
+
+ /**
+ * Try to add the checkpoint to the cache.
+ *
+ * @param checkpoint Checkpoint to be added.
+ */
+ void tryAdd(AbstractCheckpointStats checkpoint) {
+ // Don't add in progress checkpoints as they will be replaced by their
+ // completed/failed version eventually.
+ if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
+ cache.put(checkpoint.getCheckpointId(), checkpoint);
+ }
+ }
+
+ /**
+ * Try to look up a checkpoint by it's ID in the cache.
+ *
+ * @param checkpointId ID of the checkpoint to look up.
+ * @return The checkpoint or <code>null</code> if checkpoint not found.
+ */
+ AbstractCheckpointStats tryGet(long checkpointId) {
+ if (cache != null) {
+ return cache.getIfPresent(checkpointId);
+ } else {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
new file mode 100644
index 0000000..6bb8300
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex.
+ */
+public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler {
+
+ private final CheckpointStatsCache cache;
+
+ public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
+ super(executionGraphHolder);
+ this.cache = cache;
+ }
+
+ @Override
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+ long checkpointId = parseCheckpointId(params);
+ if (checkpointId == -1) {
+ return "{}";
+ }
+
+ CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
+ CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+
+ AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+ if (checkpoint != null) {
+ cache.tryAdd(checkpoint);
+ } else {
+ checkpoint = cache.tryGet(checkpointId);
+
+ if (checkpoint == null) {
+ return "{}";
+ }
+ }
+
+ return writeResponse(checkpoint);
+ }
+
+ private String writeResponse(AbstractCheckpointStats checkpoint) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ gen.writeStartObject();
+
+ gen.writeNumberField("id", checkpoint.getCheckpointId());
+ gen.writeStringField("status", checkpoint.getStatus().toString());
+ gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+ gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+ gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", checkpoint.getStateSize());
+ gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+ gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+ gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
+ gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
+
+ if (checkpoint.getStatus().isCompleted()) {
+ // --- Completed ---
+ CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
+
+ String externalPath = completed.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", externalPath);
+ }
+
+ gen.writeBooleanField("discarded", completed.isDiscarded());
+ }
+ else if (checkpoint.getStatus().isFailed()) {
+ // --- Failed ---
+ FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
+
+ gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+
+ String failureMsg = failed.getFailureMessage();
+ if (failureMsg != null) {
+ gen.writeStringField("failure_message", failureMsg);
+ }
+ }
+
+ gen.writeObjectFieldStart("tasks");
+ for (TaskStateStats taskStats : checkpoint.getAllTaskStateStats()) {
+ gen.writeObjectFieldStart(taskStats.getJobVertexId().toString());
+
+ gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", taskStats.getStateSize());
+ gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
+ gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
+ gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
+
+ gen.writeEndObject();
+ }
+ gen.writeEndObject();
+
+ gen.writeEndObject();
+ gen.close();
+
+ return writer.toString();
+ }
+
+ /**
+ * Returns the checkpoint ID parsed from the provided parameters.
+ *
+ * @param params Path parameters
+ * @return Parsed checkpoint ID or <code>-1</code> if not available.
+ */
+ static long parseCheckpointId(Map<String, String> params) {
+ String param = params.get("checkpointid");
+ if (param == null) {
+ return -1;
+ }
+
+ try {
+ return Long.parseLong(param);
+ } catch (NumberFormatException ignored) {
+ return -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
new file mode 100644
index 0000000..3e3088b
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex with
+ * the summary stats and all subtasks.
+ */
+public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler {
+
+ private final CheckpointStatsCache cache;
+
+ public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
+ super(executionGraphHolder);
+ this.cache = checkNotNull(cache);
+ }
+
+ @Override
+ public String handleJsonRequest(
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ ActorGateway jobManager) throws Exception {
+ return super.handleJsonRequest(pathParams, queryParams, jobManager);
+ }
+
+ @Override
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+ long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params);
+ if (checkpointId == -1) {
+ return "{}";
+ }
+
+ JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params);
+ if (vertexId == null) {
+ return "{}";
+ }
+
+ CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
+ CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+
+ AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+ if (checkpoint != null) {
+ cache.tryAdd(checkpoint);
+ } else {
+ checkpoint = cache.tryGet(checkpointId);
+
+ if (checkpoint == null) {
+ return "{}";
+ }
+ }
+
+ return writeResponse(checkpoint, vertexId);
+ }
+
+ private String writeResponse(AbstractCheckpointStats checkpoint, JobVertexID vertexId) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ gen.writeStartObject();
+
+ TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
+ if (taskStats == null) {
+ return "{}";
+ }
+
+ // Overview
+ gen.writeNumberField("id", checkpoint.getCheckpointId());
+ gen.writeStringField("status", checkpoint.getStatus().toString());
+ gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", taskStats.getStateSize());
+ gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
+ gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
+ gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
+
+ if (taskStats.getNumberOfAcknowledgedSubtasks() > 0) {
+ gen.writeObjectFieldStart("summary");
+ gen.writeObjectFieldStart("state_size");
+ writeMinMaxAvg(gen, taskStats.getSummaryStats().getStateSizeStats());
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("end_to_end_duration");
+ MinMaxAvgStats ackTimestampStats = taskStats.getSummaryStats().getAckTimestampStats();
+ gen.writeNumberField("min", Math.max(0, ackTimestampStats.getMinimum() - checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("max", Math.max(0, ackTimestampStats.getMaximum() - checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("avg", Math.max(0, ackTimestampStats.getAverage() - checkpoint.getTriggerTimestamp()));
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("checkpoint_duration");
+ gen.writeObjectFieldStart("sync");
+ writeMinMaxAvg(gen, taskStats.getSummaryStats().getSyncCheckpointDurationStats());
+ gen.writeEndObject();
+ gen.writeObjectFieldStart("async");
+ writeMinMaxAvg(gen, taskStats.getSummaryStats().getAsyncCheckpointDurationStats());
+ gen.writeEndObject();
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("alignment");
+ gen.writeObjectFieldStart("buffered");
+ writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentBufferedStats());
+ gen.writeEndObject();
+ gen.writeObjectFieldStart("duration");
+ writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentDurationStats());
+ gen.writeEndObject();
+ gen.writeEndObject();
+ gen.writeEndObject();
+ }
+
+ SubtaskStateStats[] subtasks = taskStats.getSubtaskStats();
+
+ gen.writeArrayFieldStart("subtasks");
+ for (int i = 0; i < subtasks.length; i++) {
+ SubtaskStateStats subtask = subtasks[i];
+
+ gen.writeStartObject();
+ gen.writeNumberField("index", i);
+
+ if (subtask != null) {
+ gen.writeStringField("status", "completed");
+ gen.writeNumberField("ack_timestamp", subtask.getAckTimestamp());
+ gen.writeNumberField("end_to_end_duration", subtask.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("state_size", subtask.getStateSize());
+
+ gen.writeObjectFieldStart("checkpoint");
+ gen.writeNumberField("sync", subtask.getSyncCheckpointDuration());
+ gen.writeNumberField("async", subtask.getAsyncCheckpointDuration());
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("alignment");
+ gen.writeNumberField("buffered", subtask.getAlignmentBuffered());
+ gen.writeNumberField("duration", subtask.getAlignmentDuration());
+ gen.writeEndObject();
+ } else {
+ gen.writeStringField("status", "pending");
+ }
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+ gen.close();
+
+ return writer.toString();
+ }
+
+ private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
+ gen.writeNumberField("min", minMaxAvg.getMinimum());
+ gen.writeNumberField("max", minMaxAvg.getMaximum());
+ gen.writeNumberField("avg", minMaxAvg.getAverage());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
new file mode 100644
index 0000000..71f3637
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Handler that returns checkpoint statistics for a job.
+ */
+public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler {
+
+ public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) {
+ super(executionGraphHolder);
+ }
+
+ @Override
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
+ CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
+ CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+
+ gen.writeStartObject();
+
+ // Counts
+ writeCounts(gen, snapshot.getCounts());
+
+ // Summary
+ writeSummary(gen, snapshot.getSummaryStats());
+
+ CheckpointStatsHistory history = snapshot.getHistory();
+
+ // Latest
+ writeLatestCheckpoints(
+ gen,
+ history.getLatestCompletedCheckpoint(),
+ history.getLatestSavepoint(),
+ history.getLatestFailedCheckpoint(),
+ snapshot.getLatestRestoredCheckpoint());
+
+ // History
+ writeHistory(gen, snapshot.getHistory());
+
+ gen.writeEndObject();
+ gen.close();
+
+ return writer.toString();
+ }
+
+ private void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
+ gen.writeObjectFieldStart("counts");
+ gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints());
+ gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints());
+ gen.writeNumberField("in_progress", counts.getNumberOfInProgressCheckpoints());
+ gen.writeNumberField("completed", counts.getNumberOfCompletedCheckpoints());
+ gen.writeNumberField("failed", counts.getNumberOfFailedCheckpoints());
+ gen.writeEndObject();
+ }
+
+ private void writeSummary(
+ JsonGenerator gen,
+ CompletedCheckpointStatsSummary summary) throws IOException {
+ gen.writeObjectFieldStart("summary");
+ gen.writeObjectFieldStart("state_size");
+ writeMinMaxAvg(gen, summary.getStateSizeStats());
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("end_to_end_duration");
+ writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("alignment_buffered");
+ writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
+ gen.writeEndObject();
+ gen.writeEndObject();
+ }
+
+ private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
+ gen.writeNumberField("min", minMaxAvg.getMinimum());
+ gen.writeNumberField("max", minMaxAvg.getMaximum());
+ gen.writeNumberField("avg", minMaxAvg.getAverage());
+ }
+
+ private void writeLatestCheckpoints(
+ JsonGenerator gen,
+ @Nullable CompletedCheckpointStats completed,
+ @Nullable CompletedCheckpointStats savepoint,
+ @Nullable FailedCheckpointStats failed,
+ @Nullable RestoredCheckpointStats restored) throws IOException {
+
+ gen.writeObjectFieldStart("latest");
+ // Completed checkpoint
+ if (completed != null) {
+ gen.writeObjectFieldStart("completed");
+ writeCheckpoint(gen, completed);
+
+ String externalPath = completed.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", completed.getExternalPath());
+ }
+
+ gen.writeEndObject();
+ }
+
+ // Completed savepoint
+ if (savepoint != null) {
+ gen.writeObjectFieldStart("savepoint");
+ writeCheckpoint(gen, savepoint);
+
+ String externalPath = savepoint.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", savepoint.getExternalPath());
+ }
+ gen.writeEndObject();
+ }
+
+ // Failed checkpoint
+ if (failed != null) {
+ gen.writeObjectFieldStart("failed");
+ writeCheckpoint(gen, failed);
+
+ gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+ String failureMsg = failed.getFailureMessage();
+ if (failureMsg != null) {
+ gen.writeStringField("failure_message", failureMsg);
+ }
+ gen.writeEndObject();
+ }
+
+ // Restored checkpoint
+ if (restored != null) {
+ gen.writeObjectFieldStart("restored");
+ gen.writeNumberField("id", restored.getCheckpointId());
+ gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp());
+ gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(restored.getProperties()));
+
+ String externalPath = restored.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", externalPath);
+ }
+ gen.writeEndObject();
+ }
+ gen.writeEndObject();
+ }
+
+ private void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
+ gen.writeNumberField("id", checkpoint.getCheckpointId());
+ gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+ gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", checkpoint.getStateSize());
+ gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+ gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+
+ }
+
+ private void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
+ gen.writeArrayFieldStart("history");
+ for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+ gen.writeStartObject();
+ gen.writeNumberField("id", checkpoint.getCheckpointId());
+ gen.writeStringField("status", checkpoint.getStatus().toString());
+ gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+ gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+ gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", checkpoint.getStateSize());
+ gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+ gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+ gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
+ gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
+
+ if (checkpoint.getStatus().isCompleted()) {
+ // --- Completed ---
+ CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
+
+ String externalPath = completed.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", externalPath);
+ }
+
+ gen.writeBooleanField("discarded", completed.isDiscarded());
+ }
+ else if (checkpoint.getStatus().isFailed()) {
+ // --- Failed ---
+ FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
+
+ gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+
+ String failureMsg = failed.getFailureMessage();
+ if (failureMsg != null) {
+ gen.writeStringField("failure_message", failureMsg);
+ }
+ }
+
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
new file mode 100644
index 0000000..410e044
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CheckpointConfigHandlerTest {
+
+ /**
+ * Tests a simple config.
+ */
+ @Test
+ public void testSimpleConfig() throws Exception {
+ long interval = 18231823L;
+ long timeout = 996979L;
+ long minPause = 119191919L;
+ int maxConcurrent = 12929329;
+ ExternalizedCheckpointSettings externalized = ExternalizedCheckpointSettings.none();
+
+ JobSnapshottingSettings settings = new JobSnapshottingSettings(
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ interval,
+ timeout,
+ minPause,
+ maxConcurrent,
+ externalized,
+ true);
+
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+ when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+ when(tracker.getSnapshottingSettings()).thenReturn(settings);
+
+ CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.readTree(json);
+
+ assertEquals("exactly_once", rootNode.get("mode").asText());
+ assertEquals(interval, rootNode.get("interval").asLong());
+ assertEquals(timeout, rootNode.get("timeout").asLong());
+ assertEquals(minPause, rootNode.get("min_pause").asLong());
+ assertEquals(maxConcurrent, rootNode.get("max_concurrent").asInt());
+
+ JsonNode externalizedNode = rootNode.get("externalization");
+ assertNotNull(externalizedNode);
+ assertEquals(false, externalizedNode.get("enabled").asBoolean());
+ }
+
+ /**
+ * Tests the that the isExactlyOnce flag is respected.
+ */
+ @Test
+ public void testAtLeastOnce() throws Exception {
+ JobSnapshottingSettings settings = new JobSnapshottingSettings(
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ 996979L,
+ 1818L,
+ 1212L,
+ 12,
+ ExternalizedCheckpointSettings.none(),
+ false); // at least once
+
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+ when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+ when(tracker.getSnapshottingSettings()).thenReturn(settings);
+
+ CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.readTree(json);
+
+ assertEquals("at_least_once", rootNode.get("mode").asText());
+ }
+
+ /**
+ * Tests that the externalized checkpoint settings are forwarded.
+ */
+ @Test
+ public void testEnabledExternalizedCheckpointSettings() throws Exception {
+ ExternalizedCheckpointSettings externalizedSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(true);
+
+ JobSnapshottingSettings settings = new JobSnapshottingSettings(
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ 996979L,
+ 1818L,
+ 1212L,
+ 12,
+ externalizedSettings,
+ false); // at least once
+
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+ when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+ when(tracker.getSnapshottingSettings()).thenReturn(settings);
+
+ CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode externalizedNode = mapper.readTree(json).get("externalization");
+ assertNotNull(externalizedNode);
+ assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+ assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
new file mode 100644
index 0000000..0fada97
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CheckpointStatsCacheTest {
+
+ @Test
+ public void testZeroSizeCache() throws Exception {
+ AbstractCheckpointStats checkpoint = createCheckpoint(0, CheckpointStatsStatus.COMPLETED);
+
+ CheckpointStatsCache cache = new CheckpointStatsCache(0);
+ cache.tryAdd(checkpoint);
+ assertNull(cache.tryGet(0L));
+ }
+
+ @Test
+ public void testCacheAddAndGet() throws Exception {
+ AbstractCheckpointStats chk0 = createCheckpoint(0, CheckpointStatsStatus.COMPLETED);
+ AbstractCheckpointStats chk1 = createCheckpoint(1, CheckpointStatsStatus.COMPLETED);
+ AbstractCheckpointStats chk2 = createCheckpoint(2, CheckpointStatsStatus.IN_PROGRESS);
+
+ CheckpointStatsCache cache = new CheckpointStatsCache(1);
+ cache.tryAdd(chk0);
+ assertEquals(chk0, cache.tryGet(0));
+
+ cache.tryAdd(chk1);
+ assertNull(cache.tryGet(0));
+ assertEquals(chk1, cache.tryGet(1));
+
+ cache.tryAdd(chk2);
+ assertNull(cache.tryGet(2));
+ assertNull(cache.tryGet(0));
+ assertEquals(chk1, cache.tryGet(1));
+ }
+
+ private AbstractCheckpointStats createCheckpoint(long id, CheckpointStatsStatus status) {
+ AbstractCheckpointStats checkpoint = mock(AbstractCheckpointStats.class);
+ when(checkpoint.getCheckpointId()).thenReturn(id);
+ when(checkpoint.getStatus()).thenReturn(status);
+ return checkpoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
new file mode 100644
index 0000000..17c8558
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CheckpointStatsDetailsHandlerTest {
+
+ /**
+ * Tests request with illegal checkpoint ID param.
+ */
+ @Test
+ public void testIllegalCheckpointId() throws Exception {
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ Map<String, String> params = new HashMap<>();
+ params.put("checkpointid", "illegal checkpoint");
+ String json = handler.handleRequest(graph, params);
+
+ assertEquals("{}", json);
+ }
+
+ /**
+ * Tests request with missing checkpoint ID param.
+ */
+ @Test
+ public void testNoCheckpointIdParam() throws Exception {
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+ assertEquals("{}", json);
+ }
+
+ /**
+ * Test lookup of not existing checkpoint in history.
+ */
+ @Test
+ public void testCheckpointNotFound() throws Exception {
+ CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+ when(history.getCheckpointById(anyLong())).thenReturn(null); // not found
+
+ CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+ when(snapshot.getHistory()).thenReturn(history);
+
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+ when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+ when(tracker.createSnapshot()).thenReturn(snapshot);
+
+ CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ Map<String, String> params = new HashMap<>();
+ params.put("checkpointid", "123");
+ String json = handler.handleRequest(graph, params);
+
+ assertEquals("{}", json);
+ verify(history, times(1)).getCheckpointById(anyLong());
+ }
+
+ /**
+ * Tests a checkpoint details request for an in progress checkpoint.
+ */
+ @Test
+ public void testCheckpointDetailsRequestInProgressCheckpoint() throws Exception {
+ PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
+ when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+ when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+ when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(checkpoint.getTriggerTimestamp()).thenReturn(1919191900L);
+ when(checkpoint.getLatestAckTimestamp()).thenReturn(1977791901L);
+ when(checkpoint.getStateSize()).thenReturn(111939272822L);
+ when(checkpoint.getEndToEndDuration()).thenReturn(121191L);
+ when(checkpoint.getAlignmentBuffered()).thenReturn(1L);
+ when(checkpoint.getNumberOfSubtasks()).thenReturn(501);
+ when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(101);
+
+ List<TaskStateStats> taskStats = new ArrayList<>();
+ TaskStateStats task1 = createTaskStateStats();
+ TaskStateStats task2 = createTaskStateStats();
+ taskStats.add(task1);
+ taskStats.add(task2);
+
+ when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+ JsonNode rootNode = triggerRequest(checkpoint);
+
+ assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+ assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+ assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+ assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
+ assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
+ assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
+ assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong());
+ assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong());
+ assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
+ assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
+
+ verifyTaskNode(task1, rootNode);
+ verifyTaskNode(task2, rootNode);
+ }
+
+ /**
+ * Tests a checkpoint details request for a completed checkpoint.
+ */
+ @Test
+ public void testCheckpointDetailsRequestCompletedCheckpoint() throws Exception {
+ CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class);
+ when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+ when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+ when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+ when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
+ when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
+ when(checkpoint.getStateSize()).thenReturn(925281L);
+ when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
+ when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
+ when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
+ when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
+ when(checkpoint.isDiscarded()).thenReturn(true);
+ when(checkpoint.getExternalPath()).thenReturn("checkpoint-external-path");
+
+ List<TaskStateStats> taskStats = new ArrayList<>();
+ TaskStateStats task1 = createTaskStateStats();
+ TaskStateStats task2 = createTaskStateStats();
+ taskStats.add(task1);
+ taskStats.add(task2);
+
+ when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+ JsonNode rootNode = triggerRequest(checkpoint);
+
+ assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+ assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+ assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+ assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
+ assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
+ assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
+ assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong());
+ assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong());
+ assertEquals(checkpoint.isDiscarded(), rootNode.get("discarded").asBoolean());
+ assertEquals(checkpoint.getExternalPath(), rootNode.get("external_path").asText());
+ assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
+ assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
+
+ verifyTaskNode(task1, rootNode);
+ verifyTaskNode(task2, rootNode);
+ }
+
+ /**
+ * Tests a checkpoint details request for a failed checkpoint.
+ */
+ @Test
+ public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {
+ FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class);
+ when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+ when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+ when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+ when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
+ when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
+ when(checkpoint.getStateSize()).thenReturn(925281L);
+ when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
+ when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
+ when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
+ when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
+ when(checkpoint.getFailureTimestamp()).thenReturn(123012890312093L);
+ when(checkpoint.getFailureMessage()).thenReturn("failure-message");
+
+ List<TaskStateStats> taskStats = new ArrayList<>();
+ TaskStateStats task1 = createTaskStateStats();
+ TaskStateStats task2 = createTaskStateStats();
+ taskStats.add(task1);
+ taskStats.add(task2);
+
+ when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+ JsonNode rootNode = triggerRequest(checkpoint);
+
+ assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+ assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+ assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean());
+ assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
+ assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
+ assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
+ assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong());
+ assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong());
+ assertEquals(checkpoint.getFailureTimestamp(), rootNode.get("failure_timestamp").asLong());
+ assertEquals(checkpoint.getFailureMessage(), rootNode.get("failure_message").asText());
+ assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
+ assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
+
+ verifyTaskNode(task1, rootNode);
+ verifyTaskNode(task2, rootNode);
+ }
+
+ // ------------------------------------------------------------------------
+
+ static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
+ CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+ when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
+ CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+ when(snapshot.getHistory()).thenReturn(history);
+
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+ when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+ when(tracker.createSnapshot()).thenReturn(snapshot);
+
+ CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ Map<String, String> params = new HashMap<>();
+ params.put("checkpointid", "123");
+ String json = handler.handleRequest(graph, params);
+
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readTree(json);
+ }
+
+ static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
+ long duration = ThreadLocalRandom.current().nextInt(128);
+
+ JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
+ assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
+ assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
+ assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
+ assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
+ assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
+ assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+ }
+
+ private static TaskStateStats createTaskStateStats() {
+ ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+ TaskStateStats task = mock(TaskStateStats.class);
+ when(task.getJobVertexId()).thenReturn(new JobVertexID());
+ when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1);
+ when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1);
+ when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1);
+ when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1);
+ when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) + 1);
+ when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(rand.nextInt(1024) + 1);
+ return task;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dec0d6bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
new file mode 100644
index 0000000..8274b36
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CheckpointStatsHandlerTest {
+
+ /**
+ * Tests a complete checkpoint stats snapshot.
+ */
+ @Test
+ public void testCheckpointStatsRequest() throws Exception {
+ // Counts
+ CheckpointStatsCounts counts = mock(CheckpointStatsCounts.class);
+ when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L);
+ when(counts.getTotalNumberOfCheckpoints()).thenReturn(12981231203L);
+ when(counts.getNumberOfInProgressCheckpoints()).thenReturn(191919);
+ when(counts.getNumberOfCompletedCheckpoints()).thenReturn(882828200L);
+ when(counts.getNumberOfFailedCheckpoints()).thenReturn(99171510L);
+
+ // Summary
+ CompletedCheckpointStatsSummary summary = mock(CompletedCheckpointStatsSummary.class);
+
+ MinMaxAvgStats stateSizeSummary = mock(MinMaxAvgStats.class);
+ when(stateSizeSummary.getMinimum()).thenReturn(81238123L);
+ when(stateSizeSummary.getMaximum()).thenReturn(19919191999L);
+ when(stateSizeSummary.getAverage()).thenReturn(1133L);
+
+ MinMaxAvgStats durationSummary = mock(MinMaxAvgStats.class);
+ when(durationSummary.getMinimum()).thenReturn(1182L);
+ when(durationSummary.getMaximum()).thenReturn(88654L);
+ when(durationSummary.getAverage()).thenReturn(171L);
+
+ MinMaxAvgStats alignmentBufferedSummary = mock(MinMaxAvgStats.class);
+ when(alignmentBufferedSummary.getMinimum()).thenReturn(81818181899L);
+ when(alignmentBufferedSummary.getMaximum()).thenReturn(89999911118654L);
+ when(alignmentBufferedSummary.getAverage()).thenReturn(11203131L);
+
+ when(summary.getStateSizeStats()).thenReturn(stateSizeSummary);
+ when(summary.getEndToEndDurationStats()).thenReturn(durationSummary);
+ when(summary.getAlignmentBufferedStats()).thenReturn(alignmentBufferedSummary);
+
+ // Latest
+ CompletedCheckpointStats latestCompleted = mock(CompletedCheckpointStats.class);
+ when(latestCompleted.getCheckpointId()).thenReturn(1992139L);
+ when(latestCompleted.getTriggerTimestamp()).thenReturn(1919191900L);
+ when(latestCompleted.getLatestAckTimestamp()).thenReturn(1977791901L);
+ when(latestCompleted.getStateSize()).thenReturn(111939272822L);
+ when(latestCompleted.getEndToEndDuration()).thenReturn(121191L);
+ when(latestCompleted.getAlignmentBuffered()).thenReturn(1L);
+ when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path");
+
+ CompletedCheckpointStats latestSavepoint = mock(CompletedCheckpointStats.class);
+ when(latestSavepoint.getCheckpointId()).thenReturn(1992139L);
+ when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L);
+ when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L);
+ when(latestSavepoint.getStateSize()).thenReturn(111939272822L);
+ when(latestSavepoint.getEndToEndDuration()).thenReturn(121191L);
+ when(latestCompleted.getAlignmentBuffered()).thenReturn(182813L);
+ when(latestSavepoint.getExternalPath()).thenReturn("savepoint-external-path");
+
+ FailedCheckpointStats latestFailed = mock(FailedCheckpointStats.class);
+ when(latestFailed.getCheckpointId()).thenReturn(1112L);
+ when(latestFailed.getTriggerTimestamp()).thenReturn(12828L);
+ when(latestFailed.getLatestAckTimestamp()).thenReturn(1901L);
+ when(latestFailed.getFailureTimestamp()).thenReturn(11999976L);
+ when(latestFailed.getStateSize()).thenReturn(111L);
+ when(latestFailed.getEndToEndDuration()).thenReturn(12L);
+ when(latestFailed.getAlignmentBuffered()).thenReturn(2L);
+ when(latestFailed.getFailureMessage()).thenReturn("expected cause");
+
+ RestoredCheckpointStats latestRestored = mock(RestoredCheckpointStats.class);
+ when(latestRestored.getCheckpointId()).thenReturn(1199L);
+ when(latestRestored.getRestoreTimestamp()).thenReturn(434242L);
+ when(latestRestored.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+ when(latestRestored.getExternalPath()).thenReturn("restored savepoint path");
+
+ // History
+ CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+ List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
+
+ PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
+ when(inProgress.getCheckpointId()).thenReturn(1992139L);
+ when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+ when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
+ when(inProgress.getLatestAckTimestamp()).thenReturn(1977791901L);
+ when(inProgress.getStateSize()).thenReturn(111939272822L);
+ when(inProgress.getEndToEndDuration()).thenReturn(121191L);
+ when(inProgress.getAlignmentBuffered()).thenReturn(1L);
+ when(inProgress.getNumberOfSubtasks()).thenReturn(501);
+ when(inProgress.getNumberOfAcknowledgedSubtasks()).thenReturn(101);
+
+ CompletedCheckpointStats completedSavepoint = mock(CompletedCheckpointStats.class);
+ when(completedSavepoint.getCheckpointId()).thenReturn(1322139L);
+ when(completedSavepoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+ when(completedSavepoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+ when(completedSavepoint.getTriggerTimestamp()).thenReturn(191900L);
+ when(completedSavepoint.getLatestAckTimestamp()).thenReturn(197791901L);
+ when(completedSavepoint.getStateSize()).thenReturn(1119822L);
+ when(completedSavepoint.getEndToEndDuration()).thenReturn(12191L);
+ when(completedSavepoint.getAlignmentBuffered()).thenReturn(111L);
+ when(completedSavepoint.getNumberOfSubtasks()).thenReturn(33501);
+ when(completedSavepoint.getNumberOfAcknowledgedSubtasks()).thenReturn(211);
+ when(completedSavepoint.isDiscarded()).thenReturn(true);
+ when(completedSavepoint.getExternalPath()).thenReturn("completed-external-path");
+
+ FailedCheckpointStats failed = mock(FailedCheckpointStats.class);
+ when(failed.getCheckpointId()).thenReturn(110719L);
+ when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+ when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+ when(failed.getTriggerTimestamp()).thenReturn(191900L);
+ when(failed.getLatestAckTimestamp()).thenReturn(197791901L);
+ when(failed.getStateSize()).thenReturn(1119822L);
+ when(failed.getEndToEndDuration()).thenReturn(12191L);
+ when(failed.getAlignmentBuffered()).thenReturn(111L);
+ when(failed.getNumberOfSubtasks()).thenReturn(33501);
+ when(failed.getNumberOfAcknowledgedSubtasks()).thenReturn(1);
+ when(failed.getFailureTimestamp()).thenReturn(119230L);
+ when(failed.getFailureMessage()).thenReturn("failure message");
+
+ checkpoints.add(inProgress);
+ checkpoints.add(completedSavepoint);
+ checkpoints.add(failed);
+ when(history.getCheckpoints()).thenReturn(checkpoints);
+ when(history.getLatestCompletedCheckpoint()).thenReturn(latestCompleted);
+ when(history.getLatestSavepoint()).thenReturn(latestSavepoint);
+ when(history.getLatestFailedCheckpoint()).thenReturn(latestFailed);
+
+ CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+ when(snapshot.getCounts()).thenReturn(counts);
+ when(snapshot.getSummaryStats()).thenReturn(summary);
+ when(snapshot.getHistory()).thenReturn(history);
+ when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored);
+
+ AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+ CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+ when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
+ when(tracker.createSnapshot()).thenReturn(snapshot);
+
+ CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.readTree(json);
+
+ JsonNode countNode = rootNode.get("counts");
+ assertEquals(counts.getNumberOfRestoredCheckpoints(), countNode.get("restored").asLong());
+ assertEquals(counts.getTotalNumberOfCheckpoints(), countNode.get("total").asLong());
+ assertEquals(counts.getNumberOfInProgressCheckpoints(), countNode.get("in_progress").asLong());
+ assertEquals(counts.getNumberOfCompletedCheckpoints(), countNode.get("completed").asLong());
+ assertEquals(counts.getNumberOfFailedCheckpoints(), countNode.get("failed").asLong());
+
+ JsonNode summaryNode = rootNode.get("summary");
+ JsonNode sizeSummaryNode = summaryNode.get("state_size");
+ assertEquals(stateSizeSummary.getMinimum(), sizeSummaryNode.get("min").asLong());
+ assertEquals(stateSizeSummary.getMaximum(), sizeSummaryNode.get("max").asLong());
+ assertEquals(stateSizeSummary.getAverage(), sizeSummaryNode.get("avg").asLong());
+
+ JsonNode durationSummaryNode = summaryNode.get("end_to_end_duration");
+ assertEquals(durationSummary.getMinimum(), durationSummaryNode.get("min").asLong());
+ assertEquals(durationSummary.getMaximum(), durationSummaryNode.get("max").asLong());
+ assertEquals(durationSummary.getAverage(), durationSummaryNode.get("avg").asLong());
+
+ JsonNode alignmentBufferedNode = summaryNode.get("alignment_buffered");
+ assertEquals(alignmentBufferedSummary.getMinimum(), alignmentBufferedNode.get("min").asLong());
+ assertEquals(alignmentBufferedSummary.getMaximum(), alignmentBufferedNode.get("max").asLong());
+ assertEquals(alignmentBufferedSummary.getAverage(), alignmentBufferedNode.get("avg").asLong());
+
+ JsonNode latestNode = rootNode.get("latest");
+ JsonNode latestCheckpointNode = latestNode.get("completed");
+ assertEquals(latestCompleted.getCheckpointId(), latestCheckpointNode.get("id").asLong());
+ assertEquals(latestCompleted.getTriggerTimestamp(), latestCheckpointNode.get("trigger_timestamp").asLong());
+ assertEquals(latestCompleted.getLatestAckTimestamp(), latestCheckpointNode.get("latest_ack_timestamp").asLong());
+ assertEquals(latestCompleted.getStateSize(), latestCheckpointNode.get("state_size").asLong());
+ assertEquals(latestCompleted.getEndToEndDuration(), latestCheckpointNode.get("end_to_end_duration").asLong());
+ assertEquals(latestCompleted.getAlignmentBuffered(), latestCheckpointNode.get("alignment_buffered").asLong());
+ assertEquals(latestCompleted.getExternalPath(), latestCheckpointNode.get("external_path").asText());
+
+ JsonNode latestSavepointNode = latestNode.get("savepoint");
+ assertEquals(latestSavepoint.getCheckpointId(), latestSavepointNode.get("id").asLong());
+ assertEquals(latestSavepoint.getTriggerTimestamp(), latestSavepointNode.get("trigger_timestamp").asLong());
+ assertEquals(latestSavepoint.getLatestAckTimestamp(), latestSavepointNode.get("latest_ack_timestamp").asLong());
+ assertEquals(latestSavepoint.getStateSize(), latestSavepointNode.get("state_size").asLong());
+ assertEquals(latestSavepoint.getEndToEndDuration(), latestSavepointNode.get("end_to_end_duration").asLong());
+ assertEquals(latestSavepoint.getAlignmentBuffered(), latestSavepointNode.get("alignment_buffered").asLong());
+ assertEquals(latestSavepoint.getExternalPath(), latestSavepointNode.get("external_path").asText());
+
+ JsonNode latestFailedNode = latestNode.get("failed");
+ assertEquals(latestFailed.getCheckpointId(), latestFailedNode.get("id").asLong());
+ assertEquals(latestFailed.getTriggerTimestamp(), latestFailedNode.get("trigger_timestamp").asLong());
+ assertEquals(latestFailed.getLatestAckTimestamp(), latestFailedNode.get("latest_ack_timestamp").asLong());
+ assertEquals(latestFailed.getStateSize(), latestFailedNode.get("state_size").asLong());
+ assertEquals(latestFailed.getEndToEndDuration(), latestFailedNode.get("end_to_end_duration").asLong());
+ assertEquals(latestFailed.getAlignmentBuffered(), latestFailedNode.get("alignment_buffered").asLong());
+ assertEquals(latestFailed.getFailureTimestamp(), latestFailedNode.get("failure_timestamp").asLong());
+ assertEquals(latestFailed.getFailureMessage(), latestFailedNode.get("failure_message").asText());
+
+ JsonNode latestRestoredNode = latestNode.get("restored");
+ assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong());
+ assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong());
+ assertEquals(CheckpointProperties.isSavepoint(latestRestored.getProperties()), latestRestoredNode.get("is_savepoint").asBoolean());
+ assertEquals(latestRestored.getExternalPath(), latestRestoredNode.get("external_path").asText());
+
+ JsonNode historyNode = rootNode.get("history");
+ Iterator<JsonNode> it = historyNode.iterator();
+
+ assertTrue(it.hasNext());
+ JsonNode inProgressNode = it.next();
+
+ assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong());
+ assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText());
+ assertEquals(CheckpointProperties.isSavepoint(inProgress.getProperties()), inProgressNode.get("is_savepoint").asBoolean());
+ assertEquals(inProgress.getTriggerTimestamp(), inProgressNode.get("trigger_timestamp").asLong());
+ assertEquals(inProgress.getLatestAckTimestamp(), inProgressNode.get("latest_ack_timestamp").asLong());
+ assertEquals(inProgress.getStateSize(), inProgressNode.get("state_size").asLong());
+ assertEquals(inProgress.getEndToEndDuration(), inProgressNode.get("end_to_end_duration").asLong());
+ assertEquals(inProgress.getAlignmentBuffered(), inProgressNode.get("alignment_buffered").asLong());
+ assertEquals(inProgress.getNumberOfSubtasks(), inProgressNode.get("num_subtasks").asInt());
+ assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), inProgressNode.get("num_acknowledged_subtasks").asInt());
+
+ assertTrue(it.hasNext());
+ JsonNode completedSavepointNode = it.next();
+
+ assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong());
+ assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText());
+ assertEquals(CheckpointProperties.isSavepoint(completedSavepoint.getProperties()), completedSavepointNode.get("is_savepoint").asBoolean());
+ assertEquals(completedSavepoint.getTriggerTimestamp(), completedSavepointNode.get("trigger_timestamp").asLong());
+ assertEquals(completedSavepoint.getLatestAckTimestamp(), completedSavepointNode.get("latest_ack_timestamp").asLong());
+ assertEquals(completedSavepoint.getStateSize(), completedSavepointNode.get("state_size").asLong());
+ assertEquals(completedSavepoint.getEndToEndDuration(), completedSavepointNode.get("end_to_end_duration").asLong());
+ assertEquals(completedSavepoint.getAlignmentBuffered(), completedSavepointNode.get("alignment_buffered").asLong());
+ assertEquals(completedSavepoint.getNumberOfSubtasks(), completedSavepointNode.get("num_subtasks").asInt());
+ assertEquals(completedSavepoint.getNumberOfAcknowledgedSubtasks(), completedSavepointNode.get("num_acknowledged_subtasks").asInt());
+
+ assertEquals(completedSavepoint.getExternalPath(), completedSavepointNode.get("external_path").asText());
+ assertEquals(completedSavepoint.isDiscarded(), completedSavepointNode.get("discarded").asBoolean());
+
+ assertTrue(it.hasNext());
+ JsonNode failedNode = it.next();
+
+ assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong());
+ assertEquals(failed.getStatus().toString(), failedNode.get("status").asText());
+ assertEquals(CheckpointProperties.isSavepoint(failed.getProperties()), failedNode.get("is_savepoint").asBoolean());
+ assertEquals(failed.getTriggerTimestamp(), failedNode.get("trigger_timestamp").asLong());
+ assertEquals(failed.getLatestAckTimestamp(), failedNode.get("latest_ack_timestamp").asLong());
+ assertEquals(failed.getStateSize(), failedNode.get("state_size").asLong());
+ assertEquals(failed.getEndToEndDuration(), failedNode.get("end_to_end_duration").asLong());
+ assertEquals(failed.getAlignmentBuffered(), failedNode.get("alignment_buffered").asLong());
+ assertEquals(failed.getNumberOfSubtasks(), failedNode.get("num_subtasks").asInt());
+ assertEquals(failed.getNumberOfAcknowledgedSubtasks(), failedNode.get("num_acknowledged_subtasks").asInt());
+
+ assertEquals(failed.getFailureTimestamp(), failedNode.get("failure_timestamp").asLong());
+ assertEquals(failed.getFailureMessage(), failedNode.get("failure_message").asText());
+
+ assertFalse(it.hasNext());
+ }
+}