You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:16 UTC
[06/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler
to flink-runtime
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
new file mode 100644
index 0000000..95d417a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A request handler that provides an overview over all taskmanagers or details for a single one.
+ */
+public class TaskManagersHandler extends AbstractJsonRequestHandler {
+
+ private static final String TASKMANAGERS_REST_PATH = "/taskmanagers";
+ private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid";
+
+ public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
+
+ private final Time timeout;
+
+ private final MetricFetcher fetcher;
+
+ public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) {
+ super(executor);
+ this.timeout = requireNonNull(timeout);
+ this.fetcher = fetcher;
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{TASKMANAGERS_REST_PATH, TASKMANAGER_DETAILS_REST_PATH};
+ }
+
+ @Override
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ if (jobManagerGateway != null) {
+ // whether one task manager's metrics are requested, or all task manager, we
+ // return them in an array. This avoids unnecessary code complexity.
+ // If only one task manager is requested, we only fetch one task manager metrics.
+ if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+ InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
+ CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+
+ return tmInstanceFuture.thenApplyAsync(
+ (Optional<Instance> optTaskManager) -> {
+ try {
+ return writeTaskManagersJson(
+ optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
+ pathParams);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+ }
+ },
+ executor);
+ } else {
+ CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+ return tmInstancesFuture.thenApplyAsync(
+ (Collection<Instance> taskManagers) -> {
+ try {
+ return writeTaskManagersJson(taskManagers, pathParams);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+ }
+ },
+ executor);
+ }
+ }
+ else {
+ return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+ }
+ }
+
+ private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartObject();
+ gen.writeArrayFieldStart("taskmanagers");
+
+ for (Instance instance : instances) {
+ gen.writeStartObject();
+ gen.writeStringField("id", instance.getId().toString());
+ gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
+ gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
+ gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
+ gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
+ gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
+ gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
+ gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
+ gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
+ gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
+
+ // only send metrics when only one task manager requests them.
+ if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+ fetcher.update();
+ MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+ if (metrics != null) {
+ gen.writeObjectFieldStart("metrics");
+ long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+ long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+ long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+ gen.writeNumberField("heapCommitted", heapCommitted);
+ gen.writeNumberField("heapUsed", heapUsed);
+ gen.writeNumberField("heapMax", heapTotal);
+
+ long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+ long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+ long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+ gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+ gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+ gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+ gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+ gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+ gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+ long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+ long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+ long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+ gen.writeNumberField("directCount", directCount);
+ gen.writeNumberField("directUsed", directUsed);
+ gen.writeNumberField("directMax", directMax);
+
+ long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+ long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+ long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+ gen.writeNumberField("mappedCount", mappedCount);
+ gen.writeNumberField("mappedUsed", mappedUsed);
+ gen.writeNumberField("mappedMax", mappedMax);
+
+ long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+ long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+ gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+ gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+ gen.writeArrayFieldStart("garbageCollectors");
+
+ for (String gcName : metrics.garbageCollectorNames) {
+ String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+ String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+ if (count != null && time != null) {
+ gen.writeStartObject();
+ gen.writeStringField("name", gcName);
+ gen.writeNumberField("count", Long.valueOf(count));
+ gen.writeNumberField("time", Long.valueOf(time));
+ gen.writeEndObject();
+ }
+ }
+
+ gen.writeEndArray();
+ gen.writeEndObject();
+ }
+ }
+
+ gen.writeEndObject();
+ }
+
+ gen.writeEndArray();
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
new file mode 100644
index 0000000..96bf7ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics tracker.
+ *
+ * <p>Back pressure is determined by sampling running tasks. If a task is
+ * slowed down by back pressure it will be stuck in memory requests to a
+ * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
+ *
+ * <p>The back pressured stack traces look like this:
+ *
+ * <pre>
+ * java.lang.Object.wait(Native Method)
+ * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
+ * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING
+ * request
+ * [...]
+ * </pre>
+ */
+public class BackPressureStatsTracker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class);
+
+ /** Maximum stack trace depth for samples. */
+ static final int MAX_STACK_TRACE_DEPTH = 3;
+
+ /** Expected class name for back pressure indicating stack trace element. */
+ static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
+
+ /** Expected method name for back pressure indicating stack trace element. */
+ static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
+
+ /** Lock guarding trigger operations. */
+ private final Object lock = new Object();
+
+ /* Stack trace sample coordinator. */
+ private final StackTraceSampleCoordinator coordinator;
+
+ /**
+ * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
+ * because they are potentially constant across runs messing up the cached
+ * data.
+ */
+ private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
+
+ /** Pending in progress stats. Important: Job vertex IDs need to be scoped
+ * by job ID, because they are potentially constant across runs messing up
+ * the cached data.*/
+ private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
+
+ /** Cleanup interval for completed stats cache. */
+ private final int cleanUpInterval;
+
+ private final int numSamples;
+
+ private final Time delayBetweenSamples;
+
+ /** Flag indicating whether the stats tracker has been shut down. */
+ private boolean shutDown;
+
+ /**
+ * Creates a back pressure statistics tracker.
+ *
+ * @param cleanUpInterval Clean up interval for completed stats.
+ * @param numSamples Number of stack trace samples when determining back pressure.
+ * @param delayBetweenSamples Delay between samples when determining back pressure.
+ */
+ public BackPressureStatsTracker(
+ StackTraceSampleCoordinator coordinator,
+ int cleanUpInterval,
+ int numSamples,
+ Time delayBetweenSamples) {
+
+ this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator");
+
+ checkArgument(cleanUpInterval >= 0, "Clean up interval");
+ this.cleanUpInterval = cleanUpInterval;
+
+ checkArgument(numSamples >= 1, "Number of samples");
+ this.numSamples = numSamples;
+
+ this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay between samples");
+
+ this.operatorStatsCache = CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .expireAfterAccess(cleanUpInterval, TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ /** Cleanup interval for completed stats cache. */
+ public long getCleanUpInterval() {
+ return cleanUpInterval;
+ }
+
+ /**
+ * Returns back pressure statistics for a operator.
+ *
+ * @param vertex Operator to get the stats for.
+ *
+ * @return Back pressure statistics for an operator
+ */
+ public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) {
+ return Optional.ofNullable(operatorStatsCache.getIfPresent(vertex));
+ }
+
+ /**
+ * Triggers a stack trace sample for a operator to gather the back pressure
+ * statistics. If there is a sample in progress for the operator, the call
+ * is ignored.
+ *
+ * @param vertex Operator to get the stats for.
+ * @return Flag indicating whether a sample with triggered.
+ */
+ @SuppressWarnings("unchecked")
+ public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
+ synchronized (lock) {
+ if (shutDown) {
+ return false;
+ }
+
+ if (!pendingStats.contains(vertex) &&
+ !vertex.getGraph().getState().isGloballyTerminalState()) {
+
+ Executor executor = vertex.getGraph().getFutureExecutor();
+
+ // Only trigger if still active job
+ if (executor != null) {
+ pendingStats.add(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
+ }
+
+ CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
+ vertex.getTaskVertices(),
+ numSamples,
+ delayBetweenSamples,
+ MAX_STACK_TRACE_DEPTH);
+
+ sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * Cleans up the operator stats cache if it contains timed out entries.
+ *
+ * <p>The Guava cache only evicts as maintenance during normal operations.
+ * If this handler is inactive, it will never be cleaned.
+ */
+ public void cleanUpOperatorStatsCache() {
+ operatorStatsCache.cleanUp();
+ }
+
+ /**
+ * Shuts down the stats tracker.
+ *
+ * <p>Invalidates the cache and clears all pending stats.
+ */
+ public void shutDown() {
+ synchronized (lock) {
+ if (!shutDown) {
+ operatorStatsCache.invalidateAll();
+ pendingStats.clear();
+
+ shutDown = true;
+ }
+ }
+ }
+
+ /**
+ * Invalidates the cache (irrespective of clean up interval).
+ */
+ void invalidateOperatorStatsCache() {
+ operatorStatsCache.invalidateAll();
+ }
+
+ /**
+ * Callback on completed stack trace sample.
+ */
+ class StackTraceSampleCompletionCallback implements BiFunction<StackTraceSample, Throwable, Void> {
+
+ private final ExecutionJobVertex vertex;
+
+ public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) {
+ this.vertex = vertex;
+ }
+
+ @Override
+ public Void apply(StackTraceSample stackTraceSample, Throwable throwable) {
+ synchronized (lock) {
+ try {
+ if (shutDown) {
+ return null;
+ }
+
+ // Job finished, ignore.
+ JobStatus jobState = vertex.getGraph().getState();
+ if (jobState.isGloballyTerminalState()) {
+ LOG.debug("Ignoring sample, because job is in state " + jobState + ".");
+ } else if (stackTraceSample != null) {
+ OperatorBackPressureStats stats = createStatsFromSample(stackTraceSample);
+ operatorStatsCache.put(vertex, stats);
+ } else {
+ LOG.debug("Failed to gather stack trace sample.", throwable);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error during stats completion.", t);
+ } finally {
+ pendingStats.remove(vertex);
+ }
+
+ return null;
+ }
+ }
+
+ /**
+ * Creates the back pressure stats from a stack trace sample.
+ *
+ * @param sample Stack trace sample to base stats on.
+ *
+ * @return Back pressure stats
+ */
+ private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) {
+ Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = sample.getStackTraces();
+
+ // Map task ID to subtask index, because the web interface expects
+ // it like that.
+ Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
+ .newHashMapWithExpectedSize(traces.size());
+
+ Set<ExecutionAttemptID> sampledTasks = sample.getStackTraces().keySet();
+
+ for (ExecutionVertex task : vertex.getTaskVertices()) {
+ ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId();
+ if (sampledTasks.contains(taskId)) {
+ subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
+ } else {
+ LOG.debug("Outdated sample. A task, which is part of the " +
+ "sample has been reset.");
+ }
+ }
+
+ // Ratio of blocked samples to total samples per sub task. Array
+ // position corresponds to sub task index.
+ double[] backPressureRatio = new double[traces.size()];
+
+ for (Entry<ExecutionAttemptID, List<StackTraceElement[]>> entry : traces.entrySet()) {
+ int backPressureSamples = 0;
+
+ List<StackTraceElement[]> taskTraces = entry.getValue();
+
+ for (StackTraceElement[] trace : taskTraces) {
+ for (int i = trace.length - 1; i >= 0; i--) {
+ StackTraceElement elem = trace[i];
+
+ if (elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
+ elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
+
+ backPressureSamples++;
+ break; // Continue with next stack trace
+ }
+ }
+ }
+
+ int subtaskIndex = subtaskIndexMap.get(entry.getKey());
+
+ int size = taskTraces.size();
+ double ratio = (size > 0)
+ ? ((double) backPressureSamples) / size
+ : 0;
+
+ backPressureRatio[subtaskIndex] = ratio;
+ }
+
+ return new OperatorBackPressureStats(
+ sample.getSampleId(),
+ sample.getEndTime(),
+ backPressureRatio);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
new file mode 100644
index 0000000..1a78a17
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics of multiple tasks.
+ *
+ * <p>Statistics are gathered by sampling stack traces of running tasks. The
+ * back pressure ratio denotes the ratio of traces indicating back pressure
+ * to the total number of sampled traces.
+ */
+public class OperatorBackPressureStats {
+
+ /** ID of the corresponding sample. */
+ private final int sampleId;
+
+ /** End time stamp of the corresponding sample. */
+ private final long endTimestamp;
+
+ /** Back pressure ratio per subtask. */
+ private final double[] subTaskBackPressureRatio;
+
+ /** Maximum back pressure ratio. */
+ private final double maxSubTaskBackPressureRatio;
+
+ public OperatorBackPressureStats(
+ int sampleId,
+ long endTimestamp,
+ double[] subTaskBackPressureRatio) {
+
+ this.sampleId = sampleId;
+ this.endTimestamp = endTimestamp;
+ this.subTaskBackPressureRatio = checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio");
+ checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub task back pressure ratio specified");
+
+ double max = 0;
+ for (double ratio : subTaskBackPressureRatio) {
+ if (ratio > max) {
+ max = ratio;
+ }
+ }
+
+ maxSubTaskBackPressureRatio = max;
+ }
+
+ /**
+ * Returns the ID of the sample.
+ *
+ * @return ID of the sample
+ */
+ public int getSampleId() {
+ return sampleId;
+ }
+
+ /**
+ * Returns the time stamp, when all stack traces were collected at the
+ * JobManager.
+ *
+ * @return Time stamp, when all stack traces were collected at the
+ * JobManager
+ */
+ public long getEndTimestamp() {
+ return endTimestamp;
+ }
+
+ /**
+ * Returns the number of sub tasks.
+ *
+ * @return Number of sub tasks.
+ */
+ public int getNumberOfSubTasks() {
+ return subTaskBackPressureRatio.length;
+ }
+
+ /**
+ * Returns the ratio of stack traces indicating back pressure to total
+ * number of sampled stack traces.
+ *
+ * @param index Subtask index.
+ *
+ * @return Ratio of stack traces indicating back pressure to total number
+ * of sampled stack traces.
+ */
+ public double getBackPressureRatio(int index) {
+ return subTaskBackPressureRatio[index];
+ }
+
+ /**
+ * Returns the maximum back pressure ratio of all sub tasks.
+ *
+ * @return Maximum back pressure ratio of all sub tasks.
+ */
+ public double getMaxBackPressureRatio() {
+ return maxSubTaskBackPressureRatio;
+ }
+
+ @Override
+ public String toString() {
+ return "OperatorBackPressureStats{" +
+ "sampleId=" + sampleId +
+ ", endTimestamp=" + endTimestamp +
+ ", subTaskBackPressureRatio=" + Arrays.toString(subTaskBackPressureRatio) +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
new file mode 100644
index 0000000..dda4e33
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A sample of stack traces for one or more tasks.
+ *
+ * <p>The sampling is triggered in {@link StackTraceSampleCoordinator}.
+ */
+public class StackTraceSample {
+
+ /** ID of this sample (unique per job). */
+ private final int sampleId;
+
+ /** Time stamp, when the sample was triggered. */
+ private final long startTime;
+
+ /** Time stamp, when all stack traces were collected at the JobManager. */
+ private final long endTime;
+
+ /** Map of stack traces by execution ID. */
+ private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
+
+ /**
+ * Creates a stack trace sample.
+ *
+ * @param sampleId ID of the sample.
+ * @param startTime Time stamp, when the sample was triggered.
+ * @param endTime Time stamp, when all stack traces were
+ * collected at the JobManager.
+ * @param stackTracesByTask Map of stack traces by execution ID.
+ */
+ public StackTraceSample(
+ int sampleId,
+ long startTime,
+ long endTime,
+ Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask) {
+
+ checkArgument(sampleId >= 0, "Negative sample ID");
+ checkArgument(startTime >= 0, "Negative start time");
+ checkArgument(endTime >= startTime, "End time before start time");
+
+ this.sampleId = sampleId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.stackTracesByTask = Collections.unmodifiableMap(stackTracesByTask);
+ }
+
+ /**
+ * Returns the ID of the sample.
+ *
+ * @return ID of the sample
+ */
+ public int getSampleId() {
+ return sampleId;
+ }
+
+ /**
+ * Returns the time stamp, when the sample was triggered.
+ *
+ * @return Time stamp, when the sample was triggered
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Returns the time stamp, when all stack traces were collected at the
+ * JobManager.
+ *
+ * @return Time stamp, when all stack traces were collected at the
+ * JobManager
+ */
+ public long getEndTime() {
+ return endTime;
+ }
+
+ /**
+ * Returns the a map of stack traces by execution ID.
+ *
+ * @return Map of stack traces by execution ID
+ */
+ public Map<ExecutionAttemptID, List<StackTraceElement[]>> getStackTraces() {
+ return stackTracesByTask;
+ }
+
+ @Override
+ public String toString() {
+ return "StackTraceSample{" +
+ "sampleId=" + sampleId +
+ ", startTime=" + startTime +
+ ", endTime=" + endTime +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
new file mode 100644
index 0000000..8c2ec6e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A coordinator for triggering and collecting stack traces of running tasks.
+ */
+public class StackTraceSampleCoordinator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
+
+ private static final int NUM_GHOST_SAMPLE_IDS = 10;
+
+ private final Object lock = new Object();
+
+ /** Executor used to run the futures. */
+ private final Executor executor;
+
+ /** Time out after the expected sampling duration. */
+ private final long sampleTimeout;
+
+ /** In progress samples (guarded by lock). */
+ private final Map<Integer, PendingStackTraceSample> pendingSamples = new HashMap<>();
+
+ /** A list of recent sample IDs to identify late messages vs. invalid ones. */
+ private final ArrayDeque<Integer> recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
+
+ /** Sample ID counter (guarded by lock). */
+ private int sampleIdCounter;
+
+ /**
+ * Flag indicating whether the coordinator is still running (guarded by
+ * lock).
+ */
+ private boolean isShutDown;
+
+ /**
+ * Creates a new coordinator for the job.
+ *
+ * @param executor to use to execute the futures
+ * @param sampleTimeout Time out after the expected sampling duration.
+ * This is added to the expected duration of a
+ * sample, which is determined by the number of
+ * samples and the delay between each sample.
+ */
+ public StackTraceSampleCoordinator(Executor executor, long sampleTimeout) {
+ checkArgument(sampleTimeout >= 0L);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.sampleTimeout = sampleTimeout;
+ }
+
+ /**
+ * Triggers a stack trace sample to all tasks.
+ *
+ * @param tasksToSample Tasks to sample.
+ * @param numSamples Number of stack trace samples to collect.
+ * @param delayBetweenSamples Delay between consecutive samples.
+ * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
+ * no maximum and keeps the complete stack trace.
+ * @return A future of the completed stack trace sample
+ */
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<StackTraceSample> triggerStackTraceSample(
+ ExecutionVertex[] tasksToSample,
+ int numSamples,
+ Time delayBetweenSamples,
+ int maxStackTraceDepth) {
+
+ checkNotNull(tasksToSample, "Tasks to sample");
+ checkArgument(tasksToSample.length >= 1, "No tasks to sample");
+ checkArgument(numSamples >= 1, "No number of samples");
+ checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth");
+
+ // Execution IDs of running tasks
+ ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length];
+ Execution[] executions = new Execution[tasksToSample.length];
+
+ // Check that all tasks are RUNNING before triggering anything. The
+ // triggering can still fail.
+ for (int i = 0; i < triggerIds.length; i++) {
+ Execution execution = tasksToSample[i].getCurrentExecutionAttempt();
+ if (execution != null && execution.getState() == ExecutionState.RUNNING) {
+ executions[i] = execution;
+ triggerIds[i] = execution.getAttemptId();
+ } else {
+ return FutureUtils.completedExceptionally(new IllegalStateException("Task " + tasksToSample[i]
+ .getTaskNameWithSubtaskIndex() + " is not running."));
+ }
+ }
+
+ synchronized (lock) {
+ if (isShutDown) {
+ return FutureUtils.completedExceptionally(new IllegalStateException("Shut down"));
+ }
+
+ final int sampleId = sampleIdCounter++;
+
+ LOG.debug("Triggering stack trace sample {}", sampleId);
+
+ final PendingStackTraceSample pending = new PendingStackTraceSample(
+ sampleId, triggerIds);
+
+ // Discard the sample if it takes too long. We don't send cancel
+ // messages to the task managers, but only wait for the responses
+ // and then ignore them.
+ long expectedDuration = numSamples * delayBetweenSamples.toMilliseconds();
+ Time timeout = Time.milliseconds(expectedDuration + sampleTimeout);
+
+ // Add the pending sample before scheduling the discard task to
+ // prevent races with removing it again.
+ pendingSamples.put(sampleId, pending);
+
+ // Trigger all samples
+ for (Execution execution: executions) {
+ final CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = execution.requestStackTraceSample(
+ sampleId,
+ numSamples,
+ delayBetweenSamples,
+ maxStackTraceDepth,
+ timeout);
+
+ stackTraceSampleFuture.handleAsync(
+ (StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) -> {
+ if (stackTraceSampleResponse != null) {
+ collectStackTraces(
+ stackTraceSampleResponse.getSampleId(),
+ stackTraceSampleResponse.getExecutionAttemptID(),
+ stackTraceSampleResponse.getSamples());
+ } else {
+ cancelStackTraceSample(sampleId, throwable);
+ }
+
+ return null;
+ },
+ executor);
+ }
+
+ return pending.getStackTraceSampleFuture();
+ }
+ }
+
+ /**
+ * Cancels a pending sample.
+ *
+ * @param sampleId ID of the sample to cancel.
+ * @param cause Cause of the cancelling (can be <code>null</code>).
+ */
+ public void cancelStackTraceSample(int sampleId, Throwable cause) {
+ synchronized (lock) {
+ if (isShutDown) {
+ return;
+ }
+
+ PendingStackTraceSample sample = pendingSamples.remove(sampleId);
+ if (sample != null) {
+ if (cause != null) {
+ LOG.info("Cancelling sample " + sampleId, cause);
+ } else {
+ LOG.info("Cancelling sample {}", sampleId);
+ }
+
+ sample.discard(cause);
+ rememberRecentSampleId(sampleId);
+ }
+ }
+ }
+
+ /**
+ * Shuts down the coordinator.
+ *
+ * <p>After shut down, no further operations are executed.
+ */
+ public void shutDown() {
+ synchronized (lock) {
+ if (!isShutDown) {
+ LOG.info("Shutting down stack trace sample coordinator.");
+
+ for (PendingStackTraceSample pending : pendingSamples.values()) {
+ pending.discard(new RuntimeException("Shut down"));
+ }
+
+ pendingSamples.clear();
+
+ isShutDown = true;
+ }
+ }
+ }
+
+ /**
+ * Collects stack traces of a task.
+ *
+ * @param sampleId ID of the sample.
+ * @param executionId ID of the sampled task.
+ * @param stackTraces Stack traces of the sampled task.
+ *
+ * @throws IllegalStateException If unknown sample ID and not recently
+ * finished or cancelled sample.
+ */
+ public void collectStackTraces(
+ int sampleId,
+ ExecutionAttemptID executionId,
+ List<StackTraceElement[]> stackTraces) {
+
+ synchronized (lock) {
+ if (isShutDown) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collecting stack trace sample {} of task {}", sampleId, executionId);
+ }
+
+ PendingStackTraceSample pending = pendingSamples.get(sampleId);
+
+ if (pending != null) {
+ pending.collectStackTraces(executionId, stackTraces);
+
+ // Publish the sample
+ if (pending.isComplete()) {
+ pendingSamples.remove(sampleId);
+ rememberRecentSampleId(sampleId);
+
+ pending.completePromiseAndDiscard();
+ }
+ } else if (recentPendingSamples.contains(sampleId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received late stack trace sample {} of task {}",
+ sampleId, executionId);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unknown sample ID " + sampleId);
+ }
+ }
+ }
+ }
+
+ private void rememberRecentSampleId(int sampleId) {
+ if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) {
+ recentPendingSamples.removeFirst();
+ }
+ recentPendingSamples.addLast(sampleId);
+ }
+
+ int getNumberOfPendingSamples() {
+ synchronized (lock) {
+ return pendingSamples.size();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A pending stack trace sample, which collects stack traces and owns a
+ * {@link StackTraceSample} promise.
+ *
+ * <p>Access pending sample in lock scope.
+ */
+ private static class PendingStackTraceSample {
+
+ private final int sampleId;
+ private final long startTime;
+ private final Set<ExecutionAttemptID> pendingTasks;
+ private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
+ private final CompletableFuture<StackTraceSample> stackTraceFuture;
+
+ private boolean isDiscarded;
+
+ PendingStackTraceSample(
+ int sampleId,
+ ExecutionAttemptID[] tasksToCollect) {
+
+ this.sampleId = sampleId;
+ this.startTime = System.currentTimeMillis();
+ this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect));
+ this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length);
+ this.stackTraceFuture = new CompletableFuture<>();
+ }
+
+ int getSampleId() {
+ return sampleId;
+ }
+
+ long getStartTime() {
+ return startTime;
+ }
+
+ boolean isDiscarded() {
+ return isDiscarded;
+ }
+
+ boolean isComplete() {
+ if (isDiscarded) {
+ throw new IllegalStateException("Discarded");
+ }
+
+ return pendingTasks.isEmpty();
+ }
+
+ void discard(Throwable cause) {
+ if (!isDiscarded) {
+ pendingTasks.clear();
+ stackTracesByTask.clear();
+
+ stackTraceFuture.completeExceptionally(new RuntimeException("Discarded", cause));
+
+ isDiscarded = true;
+ }
+ }
+
+ void collectStackTraces(ExecutionAttemptID executionId, List<StackTraceElement[]> stackTraces) {
+ if (isDiscarded) {
+ throw new IllegalStateException("Discarded");
+ }
+
+ if (pendingTasks.remove(executionId)) {
+ stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces));
+ } else if (isComplete()) {
+ throw new IllegalStateException("Completed");
+ } else {
+ throw new IllegalArgumentException("Unknown task " + executionId);
+ }
+ }
+
+ void completePromiseAndDiscard() {
+ if (isComplete()) {
+ isDiscarded = true;
+
+ long endTime = System.currentTimeMillis();
+
+ StackTraceSample stackTraceSample = new StackTraceSample(
+ sampleId,
+ startTime,
+ endTime,
+ stackTracesByTask);
+
+ stackTraceFuture.complete(stackTraceSample);
+ } else {
+ throw new IllegalStateException("Not completed yet");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<StackTraceSample> getStackTraceSampleFuture() {
+ return stackTraceFuture;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
new file mode 100644
index 0000000..2086628
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler that returns a job's snapshotting settings.
+ */
+public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler {
+
+ private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config";
+
+ public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{CHECKPOINT_CONFIG_REST_PATH};
+ }
+
+ @Override
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createCheckpointConfigJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create checkpoint config json.", e);
+ }
+ },
+ executor);
+ }
+
+ /**
+ * Archivist for the CheckpointConfigHandler.
+ */
+ public static class CheckpointConfigJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ String json = createCheckpointConfigJson(graph);
+ String path = CHECKPOINT_CONFIG_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ return Collections.singletonList(new ArchivedJson(path, json));
+ }
+ }
+
+ private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+ JobCheckpointingSettings settings = graph.getJobCheckpointingSettings();
+
+ if (settings == null) {
+ return "{}";
+ }
+
+ gen.writeStartObject();
+ {
+ gen.writeStringField("mode", settings.isExactlyOnce() ? "exactly_once" : "at_least_once");
+ gen.writeNumberField("interval", settings.getCheckpointInterval());
+ gen.writeNumberField("timeout", settings.getCheckpointTimeout());
+ gen.writeNumberField("min_pause", settings.getMinPauseBetweenCheckpoints());
+ gen.writeNumberField("max_concurrent", settings.getMaxConcurrentCheckpoints());
+
+ ExternalizedCheckpointSettings externalization = settings.getExternalizedCheckpointSettings();
+ gen.writeObjectFieldStart("externalization");
+ {
+ if (externalization.externalizeCheckpoints()) {
+ gen.writeBooleanField("enabled", true);
+ gen.writeBooleanField("delete_on_cancellation", externalization.deleteOnCancellation());
+ } else {
+ gen.writeBooleanField("enabled", false);
+ }
+ }
+ gen.writeEndObject();
+
+ }
+ gen.writeEndObject();
+
+ gen.close();
+
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
new file mode 100644
index 0000000..f21fc76
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+/**
+ * A size-based cache of accessed checkpoints for completed and failed
+ * checkpoints.
+ *
+ * <p>Having this cache in place for accessed stats improves the user
+ * experience quite a bit as accessed checkpoint stats stay available
+ * and don't expire. For example if you manage to click on the last
+ * checkpoint in the history, it is not available via the stats as soon
+ * as another checkpoint is triggered. With the cache in place, the
+ * checkpoint will still be available for investigation.
+ */
+public class CheckpointStatsCache {
+
+ @Nullable
+ private final Cache<Long, AbstractCheckpointStats> cache;
+
+ public CheckpointStatsCache(int maxNumEntries) {
+ if (maxNumEntries > 0) {
+ this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder()
+ .maximumSize(maxNumEntries)
+ .build();
+ } else {
+ this.cache = null;
+ }
+ }
+
+ /**
+ * Try to add the checkpoint to the cache.
+ *
+ * @param checkpoint Checkpoint to be added.
+ */
+ void tryAdd(AbstractCheckpointStats checkpoint) {
+ // Don't add in progress checkpoints as they will be replaced by their
+ // completed/failed version eventually.
+ if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
+ cache.put(checkpoint.getCheckpointId(), checkpoint);
+ }
+ }
+
+ /**
+ * Try to look up a checkpoint by it's ID in the cache.
+ *
+ * @param checkpointId ID of the checkpoint to look up.
+ * @return The checkpoint or <code>null</code> if checkpoint not found.
+ */
+ AbstractCheckpointStats tryGet(long checkpointId) {
+ if (cache != null) {
+ return cache.getIfPresent(checkpointId);
+ } else {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
new file mode 100644
index 0000000..61ebeda
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex.
+ */
+public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler {
+
+ private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid";
+
+ private final CheckpointStatsCache cache;
+
+ public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+ super(executionGraphHolder, executor);
+ this.cache = cache;
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH};
+ }
+
+ @Override
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ long checkpointId = parseCheckpointId(params);
+ if (checkpointId == -1) {
+ return "{}";
+ }
+
+ CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+ if (snapshot == null) {
+ return "{}";
+ }
+
+ AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+ if (checkpoint != null) {
+ cache.tryAdd(checkpoint);
+ } else {
+ checkpoint = cache.tryGet(checkpointId);
+
+ if (checkpoint == null) {
+ return "{}";
+ }
+ }
+
+ try {
+ return createCheckpointDetailsJson(checkpoint);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create checkpoint details json.", e);
+ }
+ },
+ executor);
+ }
+
+ /**
+ * Archivist for the CheckpointStatsDetails.
+ */
+ public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+ if (stats == null) {
+ return Collections.emptyList();
+ }
+ CheckpointStatsHistory history = stats.getHistory();
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+ String json = createCheckpointDetailsJson(checkpoint);
+ String path = CHECKPOINT_STATS_DETAILS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()));
+ archive.add(new ArchivedJson(path, json));
+ }
+ return archive;
+ }
+ }
+
+ public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+ gen.writeStartObject();
+
+ gen.writeNumberField("id", checkpoint.getCheckpointId());
+ gen.writeStringField("status", checkpoint.getStatus().toString());
+ gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
+ gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+ gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", checkpoint.getStateSize());
+ gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+ gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+ gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
+ gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
+
+ if (checkpoint.getStatus().isCompleted()) {
+ // --- Completed ---
+ CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
+
+ String externalPath = completed.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", externalPath);
+ }
+
+ gen.writeBooleanField("discarded", completed.isDiscarded());
+ }
+ else if (checkpoint.getStatus().isFailed()) {
+ // --- Failed ---
+ FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
+
+ gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+
+ String failureMsg = failed.getFailureMessage();
+ if (failureMsg != null) {
+ gen.writeStringField("failure_message", failureMsg);
+ }
+ }
+
+ gen.writeObjectFieldStart("tasks");
+ for (TaskStateStats taskStats : checkpoint.getAllTaskStateStats()) {
+ gen.writeObjectFieldStart(taskStats.getJobVertexId().toString());
+
+ gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", taskStats.getStateSize());
+ gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
+ gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
+ gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
+
+ gen.writeEndObject();
+ }
+ gen.writeEndObject();
+
+ gen.writeEndObject();
+ gen.close();
+
+ return writer.toString();
+ }
+
+ /**
+ * Returns the checkpoint ID parsed from the provided parameters.
+ *
+ * @param params Path parameters
+ * @return Parsed checkpoint ID or <code>-1</code> if not available.
+ */
+ static long parseCheckpointId(Map<String, String> params) {
+ String param = params.get("checkpointid");
+ if (param == null) {
+ return -1;
+ }
+
+ try {
+ return Long.parseLong(param);
+ } catch (NumberFormatException ignored) {
+ return -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
new file mode 100644
index 0000000..22a8db2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex with
+ * the summary stats and all subtasks.
+ */
+public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler {
+
+ private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid";
+
+ private final CheckpointStatsCache cache;
+
+ public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+ super(executionGraphHolder, executor);
+ this.cache = checkNotNull(cache);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH};
+ }
+
+ @Override
+ public CompletableFuture<String> handleJsonRequest(
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ JobManagerGateway jobManagerGateway) {
+ return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway);
+ }
+
+ @Override
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params);
+ if (checkpointId == -1) {
+ return CompletableFuture.completedFuture("{}");
+ }
+
+ JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params);
+ if (vertexId == null) {
+ return CompletableFuture.completedFuture("{}");
+ }
+
+ CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+ if (snapshot == null) {
+ return CompletableFuture.completedFuture("{}");
+ }
+
+ AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+ if (checkpoint != null) {
+ cache.tryAdd(checkpoint);
+ } else {
+ checkpoint = cache.tryGet(checkpointId);
+
+ if (checkpoint == null) {
+ return CompletableFuture.completedFuture("{}");
+ }
+ }
+
+ TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
+ if (taskStats == null) {
+ return CompletableFuture.completedFuture("{}");
+ }
+
+ try {
+ return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats));
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+
+ /**
+ * Archivist for the CheckpointStatsDetailsSubtasksHandler.
+ */
+ public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+ if (stats == null) {
+ return Collections.emptyList();
+ }
+ CheckpointStatsHistory history = stats.getHistory();
+ List<ArchivedJson> archive = new ArrayList<>();
+ for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+ for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) {
+ String json = createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats);
+ String path = CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString())
+ .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()))
+ .replace(":vertexid", subtaskStats.getJobVertexId().toString());
+ archive.add(new ArchivedJson(path, json));
+ }
+ }
+ return archive;
+ }
+ }
+
+ private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartObject();
+ // Overview
+ gen.writeNumberField("id", checkpoint.getCheckpointId());
+ gen.writeStringField("status", checkpoint.getStatus().toString());
+ gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", taskStats.getStateSize());
+ gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
+ gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
+ gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
+
+ if (taskStats.getNumberOfAcknowledgedSubtasks() > 0) {
+ gen.writeObjectFieldStart("summary");
+ gen.writeObjectFieldStart("state_size");
+ CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getStateSizeStats());
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("end_to_end_duration");
+ MinMaxAvgStats ackTimestampStats = taskStats.getSummaryStats().getAckTimestampStats();
+ gen.writeNumberField("min", Math.max(0, ackTimestampStats.getMinimum() - checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("max", Math.max(0, ackTimestampStats.getMaximum() - checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("avg", Math.max(0, ackTimestampStats.getAverage() - checkpoint.getTriggerTimestamp()));
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("checkpoint_duration");
+ gen.writeObjectFieldStart("sync");
+ CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getSyncCheckpointDurationStats());
+ gen.writeEndObject();
+ gen.writeObjectFieldStart("async");
+ CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getAsyncCheckpointDurationStats());
+ gen.writeEndObject();
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("alignment");
+ gen.writeObjectFieldStart("buffered");
+ CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentBufferedStats());
+ gen.writeEndObject();
+ gen.writeObjectFieldStart("duration");
+ CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentDurationStats());
+ gen.writeEndObject();
+ gen.writeEndObject();
+ gen.writeEndObject();
+ }
+
+ SubtaskStateStats[] subtasks = taskStats.getSubtaskStats();
+
+ gen.writeArrayFieldStart("subtasks");
+ for (int i = 0; i < subtasks.length; i++) {
+ SubtaskStateStats subtask = subtasks[i];
+
+ gen.writeStartObject();
+ gen.writeNumberField("index", i);
+
+ if (subtask != null) {
+ gen.writeStringField("status", "completed");
+ gen.writeNumberField("ack_timestamp", subtask.getAckTimestamp());
+ gen.writeNumberField("end_to_end_duration", subtask.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+ gen.writeNumberField("state_size", subtask.getStateSize());
+
+ gen.writeObjectFieldStart("checkpoint");
+ gen.writeNumberField("sync", subtask.getSyncCheckpointDuration());
+ gen.writeNumberField("async", subtask.getAsyncCheckpointDuration());
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("alignment");
+ gen.writeNumberField("buffered", subtask.getAlignmentBuffered());
+ gen.writeNumberField("duration", subtask.getAlignmentDuration());
+ gen.writeEndObject();
+ } else {
+ gen.writeStringField("status", "pending_or_failed");
+ }
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+ gen.close();
+
+ return writer.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
new file mode 100644
index 0000000..abb353e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler that returns checkpoint statistics for a job.
+ */
+public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler {
+
+ private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints";
+
+ public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return new String[]{CHECKPOINT_STATS_REST_PATH};
+ }
+
+ @Override
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createCheckpointStatsJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create checkpoint stats json.", e);
+ }
+ },
+ executor);
+ }
+
+ /**
+ * Archivist for the CheckpointStatsJsonHandler.
+ */
+ public static class CheckpointStatsJsonArchivist implements JsonArchivist {
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+ String json = createCheckpointStatsJson(graph);
+ String path = CHECKPOINT_STATS_REST_PATH
+ .replace(":jobid", graph.getJobID().toString());
+ return Collections.singletonList(new ArchivedJson(path, json));
+ }
+ }
+
+ private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+ if (snapshot == null) {
+ return "{}";
+ }
+
+ gen.writeStartObject();
+
+ // Counts
+ writeCounts(gen, snapshot.getCounts());
+
+ // Summary
+ writeSummary(gen, snapshot.getSummaryStats());
+
+ CheckpointStatsHistory history = snapshot.getHistory();
+
+ // Latest
+ writeLatestCheckpoints(
+ gen,
+ history.getLatestCompletedCheckpoint(),
+ history.getLatestSavepoint(),
+ history.getLatestFailedCheckpoint(),
+ snapshot.getLatestRestoredCheckpoint());
+
+ // History
+ writeHistory(gen, snapshot.getHistory());
+
+ gen.writeEndObject();
+ gen.close();
+
+ return writer.toString();
+ }
+
+ private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
+ gen.writeObjectFieldStart("counts");
+ gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints());
+ gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints());
+ gen.writeNumberField("in_progress", counts.getNumberOfInProgressCheckpoints());
+ gen.writeNumberField("completed", counts.getNumberOfCompletedCheckpoints());
+ gen.writeNumberField("failed", counts.getNumberOfFailedCheckpoints());
+ gen.writeEndObject();
+ }
+
+ private static void writeSummary(
+ JsonGenerator gen,
+ CompletedCheckpointStatsSummary summary) throws IOException {
+ gen.writeObjectFieldStart("summary");
+ gen.writeObjectFieldStart("state_size");
+ writeMinMaxAvg(gen, summary.getStateSizeStats());
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("end_to_end_duration");
+ writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
+ gen.writeEndObject();
+
+ gen.writeObjectFieldStart("alignment_buffered");
+ writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
+ gen.writeEndObject();
+ gen.writeEndObject();
+ }
+
+ static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
+ gen.writeNumberField("min", minMaxAvg.getMinimum());
+ gen.writeNumberField("max", minMaxAvg.getMaximum());
+ gen.writeNumberField("avg", minMaxAvg.getAverage());
+ }
+
+ private static void writeLatestCheckpoints(
+ JsonGenerator gen,
+ @Nullable CompletedCheckpointStats completed,
+ @Nullable CompletedCheckpointStats savepoint,
+ @Nullable FailedCheckpointStats failed,
+ @Nullable RestoredCheckpointStats restored) throws IOException {
+
+ gen.writeObjectFieldStart("latest");
+ // Completed checkpoint
+ if (completed != null) {
+ gen.writeObjectFieldStart("completed");
+ writeCheckpoint(gen, completed);
+
+ String externalPath = completed.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", completed.getExternalPath());
+ }
+
+ gen.writeEndObject();
+ }
+
+ // Completed savepoint
+ if (savepoint != null) {
+ gen.writeObjectFieldStart("savepoint");
+ writeCheckpoint(gen, savepoint);
+
+ String externalPath = savepoint.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", savepoint.getExternalPath());
+ }
+ gen.writeEndObject();
+ }
+
+ // Failed checkpoint
+ if (failed != null) {
+ gen.writeObjectFieldStart("failed");
+ writeCheckpoint(gen, failed);
+
+ gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+ String failureMsg = failed.getFailureMessage();
+ if (failureMsg != null) {
+ gen.writeStringField("failure_message", failureMsg);
+ }
+ gen.writeEndObject();
+ }
+
+ // Restored checkpoint
+ if (restored != null) {
+ gen.writeObjectFieldStart("restored");
+ gen.writeNumberField("id", restored.getCheckpointId());
+ gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp());
+ gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint());
+
+ String externalPath = restored.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", externalPath);
+ }
+ gen.writeEndObject();
+ }
+ gen.writeEndObject();
+ }
+
+ private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
+ gen.writeNumberField("id", checkpoint.getCheckpointId());
+ gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+ gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", checkpoint.getStateSize());
+ gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+ gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+
+ }
+
+ private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
+ gen.writeArrayFieldStart("history");
+ for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+ gen.writeStartObject();
+ gen.writeNumberField("id", checkpoint.getCheckpointId());
+ gen.writeStringField("status", checkpoint.getStatus().toString());
+ gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
+ gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+ gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+ gen.writeNumberField("state_size", checkpoint.getStateSize());
+ gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+ gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+ gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
+ gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
+
+ if (checkpoint.getStatus().isCompleted()) {
+ // --- Completed ---
+ CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
+
+ String externalPath = completed.getExternalPath();
+ if (externalPath != null) {
+ gen.writeStringField("external_path", externalPath);
+ }
+
+ gen.writeBooleanField("discarded", completed.isDiscarded());
+ }
+ else if (checkpoint.getStatus().isFailed()) {
+ // --- Failed ---
+ FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
+
+ gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+
+ String failureMsg = failed.getFailureMessage();
+ if (failureMsg != null) {
+ gen.writeStringField("failure_message", failureMsg);
+ }
+ }
+
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+ }
+}