You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/10 08:48:47 UTC
[11/11] flink git commit: [FLINK-4410] [runtime] Rework checkpoint
stats tracking
[FLINK-4410] [runtime] Rework checkpoint stats tracking
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d1f4bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d1f4bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d1f4bcb
Branch: refs/heads/release-1.2
Commit: 0d1f4bcbb37c5aa18f7bfcb886d3914b2f680bf0
Parents: 6ea77ed
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Dec 23 20:37:08 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Jan 10 09:47:55 2017 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 10 +-
.../jobmanager/JMXJobManagerMetricTest.java | 2 +-
.../checkpoint/AbstractCheckpointStats.java | 192 ++++++++
.../checkpoint/CheckpointCoordinator.java | 43 +-
.../checkpoint/CheckpointProperties.java | 59 ++-
.../checkpoint/CheckpointStatsCounts.java | 184 ++++++++
.../checkpoint/CheckpointStatsHistory.java | 386 ++++++++++++++++
.../checkpoint/CheckpointStatsSnapshot.java | 102 +++++
.../checkpoint/CheckpointStatsStatus.java | 62 +++
.../checkpoint/CheckpointStatsTracker.java | 447 +++++++++++++++++++
.../runtime/checkpoint/CompletedCheckpoint.java | 18 +
.../checkpoint/CompletedCheckpointStats.java | 174 ++++++++
.../CompletedCheckpointStatsSummary.java | 107 +++++
.../checkpoint/FailedCheckpointStats.java | 153 +++++++
.../runtime/checkpoint/MinMaxAvgStats.java | 130 ++++++
.../runtime/checkpoint/PendingCheckpoint.java | 97 +++-
.../checkpoint/PendingCheckpointStats.java | 190 ++++++++
.../checkpoint/RestoredCheckpointStats.java | 103 +++++
.../flink/runtime/checkpoint/SubtaskState.java | 37 +-
.../runtime/checkpoint/SubtaskStateStats.java | 176 ++++++++
.../runtime/checkpoint/TaskStateStats.java | 277 ++++++++++++
.../savepoint/SavepointV1Serializer.java | 10 +-
.../executiongraph/AccessExecutionGraph.java | 2 +-
.../AccessExecutionJobVertex.java | 10 +-
.../executiongraph/ArchivedExecutionGraph.java | 7 +-
.../ArchivedExecutionJobVertex.java | 14 +-
.../runtime/executiongraph/ExecutionGraph.java | 32 +-
.../executiongraph/ExecutionGraphBuilder.java | 30 +-
.../executiongraph/ExecutionJobVertex.java | 13 -
.../jobgraph/tasks/JobSnapshottingSettings.java | 17 +-
.../checkpoint/AcknowledgeCheckpoint.java | 17 +-
.../checkpoint/CheckpointCoordinatorTest.java | 182 +++++---
.../checkpoint/CheckpointPropertiesTest.java | 27 ++
.../checkpoint/CheckpointStateRestoreTest.java | 6 +-
.../checkpoint/CheckpointStatsCountsTest.java | 153 +++++++
.../checkpoint/CheckpointStatsHistoryTest.java | 196 ++++++++
.../checkpoint/CheckpointStatsStatusTest.java | 48 ++
.../checkpoint/CheckpointStatsTrackerTest.java | 327 ++++++++++++++
.../CompletedCheckpointStatsSummaryTest.java | 105 +++++
.../CompletedCheckpointStoreTest.java | 2 +-
.../checkpoint/CompletedCheckpointTest.java | 25 ++
.../checkpoint/CoordinatorShutdownTest.java | 4 +-
...ExecutionGraphCheckpointCoordinatorTest.java | 5 +-
.../checkpoint/FailedCheckpointStatsTest.java | 60 +++
.../runtime/checkpoint/MinMaxAvgStatsTest.java | 96 ++++
.../checkpoint/PendingCheckpointStatsTest.java | 256 +++++++++++
.../checkpoint/PendingCheckpointTest.java | 70 ++-
.../checkpoint/RestoredCheckpointStatsTest.java | 49 ++
.../checkpoint/SubtaskStateStatsTest.java | 57 +++
.../runtime/checkpoint/TaskStateStatsTest.java | 93 ++++
.../checkpoint/savepoint/SavepointV1Test.java | 3 +-
.../ArchivedExecutionGraphTest.java | 146 ++----
.../jobmanager/JobManagerHARecoveryTest.java | 5 +-
.../runtime/jobmanager/JobManagerTest.java | 12 +-
.../flink/runtime/jobmanager/JobSubmitTest.java | 15 +-
.../messages/CheckpointMessagesTest.java | 3 +-
.../runtime/jobmanager/JobManagerITCase.scala | 21 +-
.../api/graph/StreamingJobGraphGenerator.java | 17 +-
58 files changed, 4679 insertions(+), 405 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 8a9d594..eabb754 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -588,7 +588,12 @@ public final class ConfigConstants {
/** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */
public static final String JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY = "jobmanager.web.submit.enable";
- /** Flag to disable checkpoint stats. */
+ /**
+ * Flag to disable checkpoint stats.
+ *
+ * @deprecated Not possible to disable any longer. Use history size of 0.
+ */
+ @Deprecated
public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = "jobmanager.web.checkpoints.disable";
/** Config parameter defining the number of checkpoints to remember for recent history. */
@@ -1226,7 +1231,8 @@ public final class ConfigConstants {
/** By default, submitting jobs from the web-frontend is allowed. */
public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true;
- /** Default flag to disable checkpoint stats. */
+ /** Config key has been deprecated. Therefore, no default value required. */
+ @Deprecated
public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false;
/** Default number of checkpoints to remember for recent history. */
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 3ae224f..b3b7dfc 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -74,7 +74,7 @@ public class JMXJobManagerMetricTest {
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
- 500, 500, 50, 5, ExternalizedCheckpointSettings.none()));
+ 500, 500, 50, 5, ExternalizedCheckpointSettings.none(), true));
flink.waitForActorsToBeAlive();
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
new file mode 100644
index 0000000..6c261a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for checkpoint statistics.
+ */
+public abstract class AbstractCheckpointStats {
+
+ /** ID of this checkpoint. */
+ final long checkpointId;
+
+ /** Timestamp when the checkpoint was triggered at the coordinator. */
+ final long triggerTimestamp;
+
+ /** {@link TaskStateStats} accessible by their ID. */
+ final Map<JobVertexID, TaskStateStats> taskStats;
+
+ /** Total number of subtasks over all tasks. */
+ final int numberOfSubtasks;
+
+ /** Properties of the checkpoint. */
+ final CheckpointProperties props;
+
+ AbstractCheckpointStats(
+ long checkpointId,
+ long triggerTimestamp,
+ CheckpointProperties props,
+ int numberOfSubtasks,
+ Map<JobVertexID, TaskStateStats> taskStats) {
+
+ this.checkpointId = checkpointId;
+ this.triggerTimestamp = triggerTimestamp;
+ this.taskStats = checkNotNull(taskStats);
+ checkArgument(taskStats.size() > 0, "Empty task stats");
+ checkArgument(numberOfSubtasks > 0, "Non-positive number of subtasks");
+ this.numberOfSubtasks = numberOfSubtasks;
+ this.props = checkNotNull(props);
+ }
+
+ /**
+ * Returns the status of this checkpoint.
+ *
+ * @return Status of this checkpoint
+ */
+ public abstract CheckpointStatsStatus getStatus();
+
+ /**
+ * Returns the number of acknowledged subtasks.
+ *
+ * @return The number of acknowledged subtasks.
+ */
+ public abstract int getNumberOfAcknowledgedSubtasks();
+
+ /**
+ * Returns the total checkpoint state size over all subtasks.
+ *
+ * @return Total checkpoint state size over all subtasks.
+ */
+ public abstract long getStateSize();
+
+ /**
+ * Returns the total buffered bytes during alignment over all subtasks.
+ *
+ * <p>Can return <code>-1</code> if the runtime did not report this.
+ *
+ * @return Total buffered bytes during alignment over all subtasks.
+ */
+ public abstract long getAlignmentBuffered();
+
+ /**
+ * Returns the latest acknowledged subtask stats or <code>null</code> if
+ * none was acknowledged yet.
+ *
+ * @return Latest acknowledged subtask stats or <code>null</code>
+ */
+ @Nullable
+ public abstract SubtaskStateStats getLatestAcknowledgedSubtaskStats();
+
+ /**
+ * Returns the ID of this checkpoint.
+ *
+ * @return ID of this checkpoint.
+ */
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ /**
+ * Returns the timestamp when the checkpoint was triggered.
+ *
+ * @return Timestamp when the checkpoint was triggered.
+ */
+ public long getTriggerTimestamp() {
+ return triggerTimestamp;
+ }
+
+ /**
+ * Returns the properties of this checkpoint.
+ *
+ * @return Properties of this checkpoint.
+ */
+ public CheckpointProperties getProperties() {
+ return props;
+ }
+
+ /**
+ * Returns the total number of subtasks involved in this checkpoint.
+ *
+ * @return Total number of subtasks involved in this checkpoint.
+ */
+ public int getNumberOfSubtasks() {
+ return numberOfSubtasks;
+ }
+
+ /**
+ * Returns the task state stats for the given job vertex ID or
+ * <code>null</code> if no task with such an ID is available.
+ *
+ * @param jobVertexId Job vertex ID of the task stats to look up.
+ * @return The task state stats instance for the given ID or <code>null</code>.
+ */
+ public TaskStateStats getTaskStateStats(JobVertexID jobVertexId) {
+ return taskStats.get(jobVertexId);
+ }
+
+ /**
+ * Returns all task state stats instances.
+ *
+ * @return All task state stats instances.
+ */
+ public Collection<TaskStateStats> getAllTaskStateStats() {
+ return taskStats.values();
+ }
+
+ /**
+ * Returns the ack timestamp of the latest acknowledged subtask or
+ * <code>-1</code> if none was acknowledged yet.
+ *
+ * @return Ack timestamp of the latest acknowledged subtask or <code>-1</code>.
+ */
+ public long getLatestAckTimestamp() {
+ SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
+ if (subtask != null) {
+ return subtask.getAckTimestamp();
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Returns the duration of this checkpoint calculated as the time since
+ * triggering until the latest acknowledged subtask or <code>-1</code> if
+ * no subtask was acknowledged yet.
+ *
+ * @return Duration of this checkpoint or <code>-1</code> if no subtask was acknowledged yet.
+ */
+ public long getEndToEndDuration() {
+ SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
+ if (subtask != null) {
+ return Math.max(0, subtask.getAckTimestamp() - triggerTimestamp);
+ } else {
+ return -1;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3ce7a5a..9132897 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -40,6 +39,7 @@ import org.apache.flink.runtime.state.TaskStateHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
@@ -147,8 +147,9 @@ public class CheckpointCoordinator {
/** Flag marking the coordinator as shut down (not accepting any messages any more) */
private volatile boolean shutdown;
- /** Helper for tracking checkpoint statistics */
- private final CheckpointStatsTracker statsTracker;
+ /** Optional tracker for checkpoint statistics. */
+ @Nullable
+ private CheckpointStatsTracker statsTracker;
/** Default checkpoint properties **/
private final CheckpointProperties checkpointProperties;
@@ -170,7 +171,6 @@ public class CheckpointCoordinator {
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
String checkpointDirectory,
- CheckpointStatsTracker statsTracker,
Executor executor) {
// sanity checks
@@ -209,7 +209,6 @@ public class CheckpointCoordinator {
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory;
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
- this.statsTracker = checkNotNull(statsTracker);
this.timer = new Timer("Checkpoint Timer", true);
@@ -231,6 +230,15 @@ public class CheckpointCoordinator {
this.executor = checkNotNull(executor);
}
+ /**
+ * Sets the checkpoint stats tracker.
+ *
+ * @param statsTracker The checkpoint stats tracker.
+ */
+ public void setCheckpointStatsTracker(@Nullable CheckpointStatsTracker statsTracker) {
+ this.statsTracker = statsTracker;
+ }
+
// --------------------------------------------------------------------------------------------
// Clean shutdown
// --------------------------------------------------------------------------------------------
@@ -428,11 +436,19 @@ public class CheckpointCoordinator {
checkpointID,
timestamp,
ackTasks,
- isPeriodic,
props,
targetDirectory,
executor);
+ if (statsTracker != null) {
+ PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
+ checkpointID,
+ timestamp,
+ props);
+
+ checkpoint.setStatsCallback(callback);
+ }
+
// schedule the timer that will clean up the expired checkpoints
TimerTask canceller = new TimerTask() {
@Override
@@ -632,7 +648,7 @@ public class CheckpointCoordinator {
if (checkpoint != null && !checkpoint.isDiscarded()) {
- switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
+ switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetaData())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
@@ -769,8 +785,6 @@ public class CheckpointCoordinator {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
-
- statsTracker.onCompletedCheckpoint(completedCheckpoint);
}
private void rememberRecentCheckpointId(long id) {
@@ -876,6 +890,17 @@ public class CheckpointCoordinator {
stateAssignmentOperation.assignStates();
+ if (statsTracker != null) {
+ long restoreTimestamp = System.currentTimeMillis();
+ RestoredCheckpointStats restored = new RestoredCheckpointStats(
+ latest.getCheckpointID(),
+ latest.getProperties(),
+ restoreTimestamp,
+ latest.getExternalPath());
+
+ statsTracker.reportRestoredCheckpoint(restored);
+ }
+
return true;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 68a4998..4d8bab2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -179,7 +179,6 @@ public class CheckpointProperties implements Serializable {
// ------------------------------------------------------------------------
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -227,6 +226,42 @@ public class CheckpointProperties implements Serializable {
// ------------------------------------------------------------------------
+ private static final CheckpointProperties STANDARD_SAVEPOINT = new CheckpointProperties(
+ true,
+ true,
+ false,
+ false,
+ false,
+ false,
+ false);
+
+ private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties(
+ false,
+ false,
+ true,
+ true,
+ true,
+ true,
+ true);
+
+ private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
+ false,
+ true,
+ true,
+ true,
+ false, // Retain on cancellation
+ false,
+ false); // Retain on suspension
+
+ private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
+ false,
+ true,
+ true,
+ true,
+ true, // Delete on cancellation
+ false,
+ true); // Delete on suspension
+
/**
* Creates the checkpoint properties for a (manually triggered) savepoint.
*
@@ -236,7 +271,7 @@ public class CheckpointProperties implements Serializable {
* @return Checkpoint properties for a (manually triggered) savepoint.
*/
public static CheckpointProperties forStandardSavepoint() {
- return new CheckpointProperties(true, true, false, false, false, false, false);
+ return STANDARD_SAVEPOINT;
}
/**
@@ -248,7 +283,7 @@ public class CheckpointProperties implements Serializable {
* @return Checkpoint properties for a regular checkpoint.
*/
public static CheckpointProperties forStandardCheckpoint() {
- return new CheckpointProperties(false, false, true, true, true, true, true);
+ return STANDARD_CHECKPOINT;
}
/**
@@ -264,8 +299,20 @@ public class CheckpointProperties implements Serializable {
* @return Checkpoint properties for an external checkpoint.
*/
public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) {
- // Handle suspension like cancellation as graceful cluster shut down
- // suspends all jobs (non-HA).
- return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, deleteOnCancellation);
+ if (deleteOnCancellation) {
+ return EXTERNALIZED_CHECKPOINT_DELETED;
+ } else {
+ return EXTERNALIZED_CHECKPOINT_RETAINED;
+ }
+ }
+
+ /**
+ * Returns whether the checkpoint properties describe a standard savepoint.
+ *
+ * @param props Checkpoint properties to check.
+ * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
+ */
+ public static boolean isSavepoint(CheckpointProperties props) {
+ return STANDARD_SAVEPOINT.equals(props);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
new file mode 100644
index 0000000..dad45eb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Counts of checkpoints.
+ */
+public class CheckpointStatsCounts implements Serializable {
+
+ private static final long serialVersionUID = -5229425063269482528L;
+
+ /** Number of restored checkpoints. */
+ private long numRestoredCheckpoints;
+
+ /** Number of total checkpoints (in progress, completed, failed). */
+ private long numTotalCheckpoints;
+
+ /** Number of in progress checkpoints. */
+ private int numInProgressCheckpoints;
+
+ /** Number of successfully completed checkpoints. */
+ private long numCompletedCheckpoints;
+
+ /** Number of failed checkpoints. */
+ private long numFailedCheckpoints;
+
+ /**
+ * Creates the initial zero checkpoint counts.
+ */
+ CheckpointStatsCounts() {
+ this(0, 0, 0, 0, 0);
+ }
+
+ /**
+ * Creates the checkpoint counts with the given counts.
+ *
+ * @param numRestoredCheckpoints Number of restored checkpoints.
+ * @param numTotalCheckpoints Number of total checkpoints (in progress, completed, failed).
+ * @param numInProgressCheckpoints Number of in progress checkpoints.
+ * @param numCompletedCheckpoints Number of successfully completed checkpoints.
+ * @param numFailedCheckpoints Number of failed checkpoints.
+ */
+ private CheckpointStatsCounts(
+ long numRestoredCheckpoints,
+ long numTotalCheckpoints,
+ int numInProgressCheckpoints,
+ long numCompletedCheckpoints,
+ long numFailedCheckpoints) {
+
+ checkArgument(numRestoredCheckpoints >= 0, "Negative number of restored checkpoints");
+ checkArgument(numTotalCheckpoints >= 0, "Negative total number of checkpoints");
+ checkArgument(numInProgressCheckpoints >= 0, "Negative number of in progress checkpoints");
+ checkArgument(numCompletedCheckpoints >= 0, "Negative number of completed checkpoints");
+ checkArgument(numFailedCheckpoints >= 0, "Negative number of failed checkpoints");
+
+ this.numRestoredCheckpoints = numRestoredCheckpoints;
+ this.numTotalCheckpoints = numTotalCheckpoints;
+ this.numInProgressCheckpoints = numInProgressCheckpoints;
+ this.numCompletedCheckpoints = numCompletedCheckpoints;
+ this.numFailedCheckpoints = numFailedCheckpoints;
+ }
+
+ /**
+ * Returns the number of restored checkpoints.
+ *
+ * @return Number of restored checkpoints.
+ */
+ public long getNumberOfRestoredCheckpoints() {
+ return numRestoredCheckpoints;
+ }
+
+ /**
+ * Returns the total number of checkpoints (in progress, completed, failed).
+ *
+ * @return Total number of checkpoints.
+ */
+ public long getTotalNumberOfCheckpoints() {
+ return numTotalCheckpoints;
+ }
+
+ /**
+ * Returns the number of in progress checkpoints.
+ *
+ * @return Number of in progress checkpoints.
+ */
+ public int getNumberOfInProgressCheckpoints() {
+ return numInProgressCheckpoints;
+ }
+
+ /**
+ * Returns the number of completed checkpoints.
+ *
+ * @return Number of completed checkpoints.
+ */
+ public long getNumberOfCompletedCheckpoints() {
+ return numCompletedCheckpoints;
+ }
+
+ /**
+ * Returns the number of failed checkpoints.
+ *
+ * @return Number of failed checkpoints.
+ */
+ public long getNumberOfFailedCheckpoints() {
+ return numFailedCheckpoints;
+ }
+
+ /**
+ * Increments the number of restored checkpoints.
+ */
+ void incrementRestoredCheckpoints() {
+ numRestoredCheckpoints++;
+ }
+
+ /**
+ * Increments the number of total and in progress checkpoints.
+ */
+ void incrementInProgressCheckpoints() {
+ numInProgressCheckpoints++;
+ numTotalCheckpoints++;
+ }
+
+ /**
+ * Increments the number of successfully completed checkpoints.
+ *
+ * <p>It is expected that this follows a previous call to
+ * {@link #incrementInProgressCheckpoints()}.
+ */
+ void incrementCompletedCheckpoints() {
+ if (--numInProgressCheckpoints < 0) {
+ throw new IllegalStateException("Incremented the completed number of checkpoints " +
+ "without incrementing the in progress checkpoints before.");
+ }
+ numCompletedCheckpoints++;
+ }
+
+ /**
+ * Increments the number of failed checkpoints.
+ *
+ * <p>It is expected that this follows a previous call to
+ * {@link #incrementInProgressCheckpoints()}.
+ */
+ void incrementFailedCheckpoints() {
+ if (--numInProgressCheckpoints < 0) {
+ throw new IllegalStateException("Incremented the completed number of checkpoints " +
+ "without incrementing the in progress checkpoints before.");
+ }
+ numFailedCheckpoints++;
+ }
+
+ /**
+ * Creates a snapshot of the current state.
+ *
+ * @return Snapshot of the current state.
+ */
+ CheckpointStatsCounts createSnapshot() {
+ return new CheckpointStatsCounts(
+ numRestoredCheckpoints,
+ numTotalCheckpoints,
+ numInProgressCheckpoints,
+ numCompletedCheckpoints,
+ numFailedCheckpoints);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
new file mode 100644
index 0000000..56fc9c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An array based history of checkpoint stats.
+ *
+ * <p>The size of the array is constrained by the maximum allowed size. The
+ * array starts empty an grows with each added checkpoint until it reaches
+ * the maximum number of elements. At this point, the elements wrap around
+ * and the least recently added entry is overwritten.
+ *
+ * <p>Access happens via an checkpointsIterable over the statistics and a map that
+ * exposes the checkpoint by their ID. Both of these are only guaranteed
+ * to reflect the latest state after a call to {@link #createSnapshot()}.
+ *
+ * <p>Furthermore the history tracks the latest completed and latest failed
+ * checkpoint as well as the latest savepoint.
+ */
+public class CheckpointStatsHistory implements Serializable {
+
+ private static final long serialVersionUID = 7090320677606528415L;
+
+ /** Iterable over all available stats. Only updated on {@link #createSnapshot()}. */
+ private final Iterable<AbstractCheckpointStats> checkpointsIterable;
+
+ /** Map of all available stats keyed by their ID. Only updated on {@link #createSnapshot()}. */
+ private final Map<Long, AbstractCheckpointStats> checkpointsById;
+
+ /** Maximum array size. */
+ private final int maxSize;
+
+ /** Flag indicating whether this the history is read-only. */
+ private final boolean readOnly;
+
+ /** Array of checkpointsArray. Writes go aginst this array. */
+ private transient AbstractCheckpointStats[] checkpointsArray;
+
+ /** Next position in {@link #checkpointsArray} to write to. */
+ private transient int nextPos;
+
+ /** The latest successfully completed checkpoint. */
+ @Nullable
+ private CompletedCheckpointStats latestCompletedCheckpoint;
+
+ /** The latest failed checkpoint. */
+ @Nullable
+ private FailedCheckpointStats latestFailedCheckpoint;
+
+ /** The latest successfully completed savepoint. */
+ @Nullable
+ private CompletedCheckpointStats latestSavepoint;
+
+ /**
+ * Creates a writeable checkpoint history with the given maximum size.
+ *
+ * <p>The read views are only updated on calls to {@link #createSnapshot()}.
+ * Initially they are empty.
+ *
+ * @param maxSize Maximum history size.
+ */
+ CheckpointStatsHistory(int maxSize) {
+ this(
+ false,
+ maxSize,
+ new AbstractCheckpointStats[0],
+ Collections.<AbstractCheckpointStats>emptyList(),
+ Collections.<Long, AbstractCheckpointStats>emptyMap(),
+ null,
+ null,
+ null);
+ }
+
+ /**
+ * Creates a checkpoint history with the given maximum size and state.
+ *
+ * <p>The read views are only updated on calls to {@link #createSnapshot()}.
+ * Initially they are empty.
+ *
+ * @param readOnly Flag indicating whether the history is read-only.
+ * @param maxSize Maximum history size.
+ * @param checkpointsIterable Checkpoints iterable.
+ * @param checkpointsById Checkpoints by ID.
+ */
+ private CheckpointStatsHistory(
+ boolean readOnly,
+ int maxSize,
+ AbstractCheckpointStats[] checkpointArray,
+ Iterable<AbstractCheckpointStats> checkpointsIterable,
+ Map<Long, AbstractCheckpointStats> checkpointsById,
+ CompletedCheckpointStats latestCompletedCheckpoint,
+ FailedCheckpointStats latestFailedCheckpoint,
+ CompletedCheckpointStats latestSavepoint) {
+
+ this.readOnly = readOnly;
+ checkArgument(maxSize >= 0, "Negative maximum size");
+ this.maxSize = maxSize;
+ this.checkpointsArray = checkpointArray;
+ this.checkpointsIterable = checkNotNull(checkpointsIterable);
+ this.checkpointsById = checkNotNull(checkpointsById);
+ this.latestCompletedCheckpoint = latestCompletedCheckpoint;
+ this.latestFailedCheckpoint = latestFailedCheckpoint;
+ this.latestSavepoint = latestSavepoint;
+ }
+
+ public Iterable<AbstractCheckpointStats> getCheckpoints() {
+ return checkpointsIterable;
+ }
+
+ public AbstractCheckpointStats getCheckpointById(long checkpointId) {
+ return checkpointsById.get(checkpointId);
+ }
+
+ @Nullable
+ public CompletedCheckpointStats getLatestCompletedCheckpoint() {
+ return latestCompletedCheckpoint;
+ }
+
+ @Nullable
+ public FailedCheckpointStats getLatestFailedCheckpoint() {
+ return latestFailedCheckpoint;
+ }
+
+ @Nullable
+ public CompletedCheckpointStats getLatestSavepoint() {
+ return latestSavepoint;
+ }
+
+ /**
+ * Creates a snapshot of the current state.
+ *
+ * @return Snapshot of the current state.
+ */
+ CheckpointStatsHistory createSnapshot() {
+ if (readOnly) {
+ throw new UnsupportedOperationException("Can't create a snapshot of a read-only history.");
+ }
+
+ Iterable<AbstractCheckpointStats> checkpointsIterable;
+ Map<Long, AbstractCheckpointStats> checkpointsById;
+
+ checkpointsById = new HashMap<>(checkpointsArray.length);
+
+ if (maxSize == 0) {
+ checkpointsIterable = Collections.emptyList();
+ } else {
+ // Create snapshot iterator (copies the array)
+ checkpointsIterable = new CheckpointsStatsHistoryIterable(checkpointsArray, nextPos);
+
+ for (AbstractCheckpointStats checkpoint : checkpointsIterable) {
+ checkpointsById.put(checkpoint.getCheckpointId(), checkpoint);
+ }
+ }
+
+ if (latestCompletedCheckpoint != null) {
+ checkpointsById.put(latestCompletedCheckpoint.getCheckpointId(), latestCompletedCheckpoint);
+ }
+
+ if (latestFailedCheckpoint != null) {
+ checkpointsById.put(latestFailedCheckpoint.getCheckpointId(), latestFailedCheckpoint);
+ }
+
+ if (latestSavepoint != null) {
+ checkpointsById.put(latestSavepoint.getCheckpointId(), latestSavepoint);
+ }
+
+ return new CheckpointStatsHistory(
+ true,
+ maxSize,
+ null,
+ checkpointsIterable,
+ checkpointsById,
+ latestCompletedCheckpoint,
+ latestFailedCheckpoint,
+ latestSavepoint);
+ }
+
+ /**
+ * Adds an in progress checkpoint to the checkpoint history.
+ *
+ * @param pending In progress checkpoint to add.
+ */
+ void addInProgressCheckpoint(PendingCheckpointStats pending) {
+ if (readOnly) {
+ throw new UnsupportedOperationException("Can't create a snapshot of a read-only history.");
+ }
+
+ if (maxSize == 0) {
+ return;
+ }
+
+ checkNotNull(pending, "Pending checkpoint");
+
+ // Grow the array if required. This happens only for the first entries
+ // and makes the iterator logic easier, because we don't have any
+ // null elements with the growing array.
+ if (checkpointsArray.length < maxSize) {
+ checkpointsArray = Arrays.copyOf(checkpointsArray, checkpointsArray.length + 1);
+ }
+
+ // Wrap around if we are at the end. The next pos is the least recently
+ // added checkpoint.
+ if (nextPos == checkpointsArray.length) {
+ nextPos = 0;
+ }
+
+ checkpointsArray[nextPos++] = pending;
+ }
+
+ /**
+ * Searches for the in progress checkpoint with the given ID and replaces
+ * it with the given completed or failed checkpoint.
+ *
+ * <p>This is bounded by the maximum number of concurrent in progress
+ * checkpointsArray, which means that the runtime of this is constant.
+ *
+ * @param completedOrFailed The completed or failed checkpoint to replace the in progress checkpoint with.
+ * @return <code>true</code> if the checkpoint was replaced or <code>false</code> otherwise.
+ */
+ boolean replacePendingCheckpointById(AbstractCheckpointStats completedOrFailed) {
+ checkArgument(!completedOrFailed.getStatus().isInProgress(), "Not allowed to replace with in progress checkpoints.");
+
+ if (readOnly) {
+ throw new UnsupportedOperationException("Can't create a snapshot of a read-only history.");
+ }
+
+ // Update the latest checkpoint stats
+ if (completedOrFailed.getStatus().isCompleted()) {
+ CompletedCheckpointStats completed = (CompletedCheckpointStats) completedOrFailed;
+ if (CheckpointProperties.isSavepoint(completed.getProperties()) &&
+ (latestSavepoint == null ||
+ completed.getCheckpointId() > latestSavepoint.getCheckpointId())) {
+
+ latestSavepoint = completed;
+ } else if (latestCompletedCheckpoint == null ||
+ completed.getCheckpointId() > latestCompletedCheckpoint.getCheckpointId()) {
+
+ latestCompletedCheckpoint = completed;
+ }
+ } else if (completedOrFailed.getStatus().isFailed()) {
+ FailedCheckpointStats failed = (FailedCheckpointStats) completedOrFailed;
+ if (latestFailedCheckpoint == null ||
+ failed.getCheckpointId() > latestFailedCheckpoint.getCheckpointId()) {
+
+ latestFailedCheckpoint = failed;
+ }
+ }
+
+ if (maxSize == 0) {
+ return false;
+ }
+
+ long checkpointId = completedOrFailed.getCheckpointId();
+
+ // We start searching from the last inserted position. Since the entries
+ // wrap around the array we search until we are at index 0 and then from
+ // the end of the array until (start pos + 1).
+ int startPos = nextPos == checkpointsArray.length ? checkpointsArray.length - 1 : nextPos - 1;
+
+ for (int i = startPos; i >= 0; i--) {
+ if (checkpointsArray[i].getCheckpointId() == checkpointId) {
+ checkpointsArray[i] = completedOrFailed;
+ return true;
+ }
+ }
+
+ for (int i = checkpointsArray.length - 1; i > startPos; i--) {
+ if (checkpointsArray[i].getCheckpointId() == checkpointId) {
+ checkpointsArray[i] = completedOrFailed;
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Iterable over the current checkpoint history.
+ *
+ * <p>The iteration order is in reverse insertion order.
+ */
+ private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats> {
+
+ /** Copy of the checkpointsArray array at the point when this iterable was created. */
+ private final AbstractCheckpointStats[] checkpointsArray;
+
+ /** The starting position from which to iterate over the array. */
+ private final int startPos;
+
+ /**
+ * Creates the iterable by creating a copy of the checkpoints array.
+ *
+ * @param checkpointsArray Checkpoints to iterate over. This array is copied.
+ * @param nextPos The next write position for the array
+ */
+ CheckpointsStatsHistoryIterable(AbstractCheckpointStats[] checkpointsArray, int nextPos) {
+ // Copy the array
+ this.checkpointsArray = Arrays.copyOf(checkpointsArray, checkpointsArray.length);
+
+ // Start from nextPos, because that's were the oldest element is
+ this.startPos = nextPos == checkpointsArray.length ? checkpointsArray.length - 1 : nextPos - 1;
+ }
+
+ @Override
+ public Iterator<AbstractCheckpointStats> iterator() {
+ return new CheckpointsSnapshotIterator();
+ }
+
+ /**
+ * Iterator over the checkpoints array.
+ */
+ private class CheckpointsSnapshotIterator implements Iterator<AbstractCheckpointStats> {
+
+ /** The current position. */
+ private int currentPos;
+
+ /** The remaining number of elements to iterate over. */
+ private int remaining;
+
+ /**
+ * Creates the iterator.
+ */
+ CheckpointsSnapshotIterator() {
+ this.currentPos = startPos;
+ this.remaining = checkpointsArray.length;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return remaining > 0;
+ }
+
+ @Override
+ public AbstractCheckpointStats next() {
+ if (hasNext()) {
+ AbstractCheckpointStats stats = checkpointsArray[currentPos--];
+
+ // Wrap around if needed
+ if (currentPos == -1) {
+ currentPos = checkpointsArray.length - 1;
+ }
+
+ remaining--;
+
+ return stats;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
new file mode 100644
index 0000000..e0bfed7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A snapshot of the checkpoint stats.
+ */
+public class CheckpointStatsSnapshot implements Serializable {
+
+ private static final long serialVersionUID = 8914278419087217964L;
+
+ /** Snapshot of the checkpoint counts. */
+ private final CheckpointStatsCounts counts;
+
+ /** Snapshot of the completed checkpoints summary stats. */
+ private final CompletedCheckpointStatsSummary summary;
+
+ /** Snapshot of the checkpoint history. */
+ private final CheckpointStatsHistory history;
+
+ /** The latest restored checkpoint operation. */
+ @Nullable
+ private final RestoredCheckpointStats latestRestoredCheckpoint;
+
+ /**
+ * Creates a stats snapshot.
+ *
+ * @param counts Snapshot of the checkpoint counts.
+ * @param summary Snapshot of the completed checkpoints summary stats.
+ * @param history Snapshot of the checkpoint history.
+ * @param latestRestoredCheckpoint The latest restored checkpoint operation.
+ */
+ CheckpointStatsSnapshot(
+ CheckpointStatsCounts counts,
+ CompletedCheckpointStatsSummary summary,
+ CheckpointStatsHistory history,
+ @Nullable RestoredCheckpointStats latestRestoredCheckpoint) {
+
+ this.counts = checkNotNull(counts);
+ this.summary= checkNotNull(summary);
+ this.history = checkNotNull(history);
+ this.latestRestoredCheckpoint = latestRestoredCheckpoint;
+ }
+
+ /**
+ * Returns the snapshotted checkpoint counts.
+ *
+ * @return Snapshotted checkpoint counts.
+ */
+ public CheckpointStatsCounts getCounts() {
+ return counts;
+ }
+
+ /**
+ * Returns the snapshotted completed checkpoint summary stats.
+ *
+ * @return Snapshotted completed checkpoint summary stats.
+ */
+ public CompletedCheckpointStatsSummary getSummaryStats() {
+ return summary;
+ }
+
+ /**
+ * Returns the snapshotted checkpoint history.
+ *
+ * @return Snapshotted checkpoint history.
+ */
+ public CheckpointStatsHistory getHistory() {
+ return history;
+ }
+
+ /**
+ * Returns the latest restored checkpoint.
+ *
+ * @return Latest restored checkpoint or <code>null</code>.
+ */
+ public RestoredCheckpointStats getLatestRestoredCheckpoint() {
+ return latestRestoredCheckpoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatus.java
new file mode 100644
index 0000000..670a6e4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatus.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Status of the tracked checkpoint.
+ */
+public enum CheckpointStatsStatus {
+
+ /** Checkpoint that is still in progress. */
+ IN_PROGRESS,
+
+ /** Checkpoint that has successfully completed. */
+ COMPLETED,
+
+ /** Checkpoint that failed. */
+ FAILED;
+
+ /**
+ * Returns whether the checkpoint is in progress.
+ *
+ * @return <code>true</code> if checkpoint is in progress, <code>false</code> otherwise.
+ */
+ public boolean isInProgress() {
+ return this == IN_PROGRESS;
+ }
+
+ /**
+ * Returns whether the checkpoint has completed successfully.
+ *
+ * @return <code>true</code> if checkpoint has completed, <code>false</code> otherwise.
+ */
+ public boolean isCompleted() {
+ return this == COMPLETED;
+ }
+
+ /**
+ * Returns whether the checkpoint has failed.
+ *
+ * @return <code>true</code> if checkpoint has failed, <code>false</code> otherwise.
+ */
+ public boolean isFailed() {
+ return this == FAILED;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
new file mode 100644
index 0000000..92f707f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tracker for checkpoint statistics.
+ *
+ * <p>This is tightly integrated with the {@link CheckpointCoordinator} in
+ * order to ease the gathering of fine-grained statistics.
+ *
+ * <p>The tracked stats include summary counts, a detailed history of recent
+ * and in progress checkpoints as well as summaries about the size, duration
+ * and more of recent checkpoints.
+ *
+ * <p>Data is gathered via callbacks in the {@link CheckpointCoordinator} and
+ * related classes like {@link PendingCheckpoint} and {@link CompletedCheckpoint},
+ * which receive the raw stats data in the first place.
+ *
+ * <p>The statistics are accessed via {@link #createSnapshot()} and exposed via
+ * both the web frontend and the {@link Metric} system.
+ */
+public class CheckpointStatsTracker implements Serializable {
+
+ private static final long serialVersionUID = 1694085244807339288L;
+
+ /**
+ * Lock used to update stats and creating snapshots. Updates always happen
+ * from a single Thread at a time and there can be multiple concurrent read
+ * accesses to the latest stats snapshot.
+ *
+ * Currently, writes are executed by whatever Thread executes the coordinator
+ * actions (which already happens in locked scope). Reads can come from
+ * multiple concurrent Netty event loop Threads of the web runtime monitor.
+ */
+ private final ReentrantLock statsReadWriteLock = new ReentrantLock();
+
+ /** The job vertices taking part in the checkpoints. */
+ private final List<ExecutionJobVertex> jobVertices;
+
+ /** Total number of subtasks to checkpoint. */
+ private final int totalSubtaskCount;
+
+ /** Snapshotting settings created from the CheckpointConfig. */
+ private final JobSnapshottingSettings jobSnapshottingSettings;
+
+ /** Checkpoint counts. */
+ private final CheckpointStatsCounts counts = new CheckpointStatsCounts();
+
+ /** A summary of the completed checkpoint stats. */
+ private final CompletedCheckpointStatsSummary summary = new CompletedCheckpointStatsSummary();
+
+ /** History of checkpoints. */
+ private final CheckpointStatsHistory history;
+
+ /** The latest restored checkpoint. */
+ @Nullable
+ private RestoredCheckpointStats latestRestoredCheckpoint;
+
+ /** Latest created snapshot. */
+ private volatile CheckpointStatsSnapshot latestSnapshot;
+
+ /**
+ * Flag indicating whether a new snapshot needs to be created. This is true
+ * if a new checkpoint was triggered or updated (completed successfully or
+ * failed).
+ */
+ private volatile boolean dirty;
+
+ /**
+ * Creates a new checkpoint stats tracker.
+ *
+ * @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in progress ones.
+ * @param jobVertices Job vertices involved in the checkpoints.
+ * @param jobSnapshottingSettings Snapshotting settings created from the CheckpointConfig.
+ * @param metricGroup Metric group for exposed metrics
+ */
+ public CheckpointStatsTracker(
+ int numRememberedCheckpoints,
+ List<ExecutionJobVertex> jobVertices,
+ JobSnapshottingSettings jobSnapshottingSettings,
+ MetricGroup metricGroup) {
+
+ checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints");
+ this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
+ this.jobVertices = checkNotNull(jobVertices, "JobVertices");
+ this.jobSnapshottingSettings = checkNotNull(jobSnapshottingSettings);
+
+ // Compute the total subtask count. We do this here in order to only
+ // do it once.
+ int count = 0;
+ for (ExecutionJobVertex vertex : jobVertices) {
+ count += vertex.getParallelism();
+ }
+ this.totalSubtaskCount = count;
+
+ // Latest snapshot is empty
+ latestSnapshot = new CheckpointStatsSnapshot(
+ counts.createSnapshot(),
+ summary.createSnapshot(),
+ history.createSnapshot(),
+ null);
+
+ // Register the metrics
+ registerMetrics(metricGroup);
+ }
+
+ /**
+ * Returns the job's snapshotting settings which are derived from the
+ * CheckpointConfig.
+ *
+ * @return The job's snapshotting settings.
+ */
+ public JobSnapshottingSettings getSnapshottingSettings() {
+ return jobSnapshottingSettings;
+ }
+
+ /**
+ * Creates a new snapshot of the available stats.
+ *
+ * @return The latest statistics snapshot.
+ */
+ public CheckpointStatsSnapshot createSnapshot() {
+ CheckpointStatsSnapshot snapshot = latestSnapshot;
+
+ // Only create a new snapshot if dirty and no update in progress,
+ // because we don't want to block the coordinator.
+ if (dirty && statsReadWriteLock.tryLock()) {
+ try {
+ // Create a new snapshot
+ snapshot = new CheckpointStatsSnapshot(
+ counts.createSnapshot(),
+ summary.createSnapshot(),
+ history.createSnapshot(),
+ latestRestoredCheckpoint);
+
+ latestSnapshot = snapshot;
+
+ dirty = false;
+ } finally {
+ statsReadWriteLock.unlock();
+ }
+ }
+
+ return snapshot;
+ }
+
+ // ------------------------------------------------------------------------
+ // Callbacks
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new pending checkpoint tracker.
+ *
+ * @param checkpointId ID of the checkpoint.
+ * @param triggerTimestamp Trigger timestamp of the checkpoint.
+ * @param props The checkpoint properties.
+ * @return Tracker for statistics gathering.
+ */
+ PendingCheckpointStats reportPendingCheckpoint(
+ long checkpointId,
+ long triggerTimestamp,
+ CheckpointProperties props) {
+
+ ConcurrentHashMap<JobVertexID, TaskStateStats> taskStateStats = createEmptyTaskStateStatsMap();
+
+ PendingCheckpointStats pending = new PendingCheckpointStats(
+ checkpointId,
+ triggerTimestamp,
+ props,
+ totalSubtaskCount,
+ taskStateStats,
+ new PendingCheckpointStatsCallback());
+
+ statsReadWriteLock.lock();
+ try {
+ counts.incrementInProgressCheckpoints();
+ history.addInProgressCheckpoint(pending);
+
+ dirty = true;
+ } finally {
+ statsReadWriteLock.unlock();
+ }
+
+ return pending;
+ }
+
+ void reportRestoredCheckpoint(RestoredCheckpointStats restored) {
+ checkNotNull(restored, "Restored checkpoint");
+
+ statsReadWriteLock.lock();
+ try {
+ counts.incrementRestoredCheckpoints();
+ latestRestoredCheckpoint = restored;
+
+ dirty = true;
+ } finally {
+ statsReadWriteLock.unlock();
+ }
+ }
+
+ /**
+ * Callback when a checkpoint completes.
+ *
+ * @param completed The completed checkpoint stats.
+ */
+ private void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
+ statsReadWriteLock.lock();
+ try {
+ counts.incrementCompletedCheckpoints();
+ history.replacePendingCheckpointById(completed);
+
+ summary.updateSummary(completed);
+
+ dirty = true;
+ } finally {
+ statsReadWriteLock.unlock();
+ }
+ }
+
+ /**
+ * Callback when a checkpoint fails.
+ *
+ * @param failed The failed checkpoint stats.
+ */
+ private void reportFailedCheckpoint(FailedCheckpointStats failed) {
+ statsReadWriteLock.lock();
+ try {
+ counts.incrementFailedCheckpoints();
+ history.replacePendingCheckpointById(failed);
+
+ dirty = true;
+ } finally {
+ statsReadWriteLock.unlock();
+ }
+ }
+
+ /**
+ * Creates an empty map with a {@link TaskStateStats} instance per task
+ * that is involved in the checkpoint.
+ *
+ * @return An empty map with an {@link TaskStateStats} entry for each task that is involved in the checkpoint.
+ */
+ private ConcurrentHashMap<JobVertexID, TaskStateStats> createEmptyTaskStateStatsMap() {
+ ConcurrentHashMap<JobVertexID, TaskStateStats> taskStatsMap = new ConcurrentHashMap<>(jobVertices.size());
+ for (ExecutionJobVertex vertex : jobVertices) {
+ TaskStateStats taskStats = new TaskStateStats(vertex.getJobVertexId(), vertex.getParallelism());
+ taskStatsMap.put(vertex.getJobVertexId(), taskStats);
+ }
+ return taskStatsMap;
+ }
+
+ /**
+ * Callback for finalization of a pending checkpoint.
+ */
+ class PendingCheckpointStatsCallback {
+
+ /**
+ * Report a completed checkpoint.
+ *
+ * @param completed The completed checkpoint.
+ */
+ void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
+ CheckpointStatsTracker.this.reportCompletedCheckpoint(completed);
+ }
+
+ /**
+ * Report a failed checkpoint.
+ *
+ * @param failed The failed checkpoint.
+ */
+ void reportFailedCheckpoint(FailedCheckpointStats failed) {
+ CheckpointStatsTracker.this.reportFailedCheckpoint(failed);
+ }
+
+ }
+
+ // ------------------------------------------------------------------------
+ // Metrics
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ static final String NUMBER_OF_CHECKPOINTS_METRIC = "totalNumberOfCheckpoints";
+
+ @VisibleForTesting
+ static final String NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC = "numberOfInProgressCheckpoints";
+
+ @VisibleForTesting
+ static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC = "numberOfCompletedCheckpoints";
+
+ @VisibleForTesting
+ static final String NUMBER_OF_FAILED_CHECKPOINTS_METRIC = "numberOfFailedCheckpoints";
+
+ @VisibleForTesting
+ static final String LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC = "lastCheckpointRestoreTimestamp";
+
+ @VisibleForTesting
+ static final String LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC = "lastCheckpointSize";
+
+ @VisibleForTesting
+ static final String LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC = "lastCheckpointDuration";
+
+ @VisibleForTesting
+ static final String LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC = "lastCheckpointAlignmentBuffered";
+
+ @VisibleForTesting
+ static final String LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC = "lastCheckpointExternalPath";
+
+ /**
+ * Register the exposed metrics.
+ *
+ * @param metricGroup Metric group to use for the metrics.
+ */
+ private void registerMetrics(MetricGroup metricGroup) {
+ metricGroup.gauge(NUMBER_OF_CHECKPOINTS_METRIC, new CheckpointsCounter());
+ metricGroup.gauge(NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC, new InProgressCheckpointsCounter());
+ metricGroup.gauge(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC, new CompletedCheckpointsCounter());
+ metricGroup.gauge(NUMBER_OF_FAILED_CHECKPOINTS_METRIC, new FailedCheckpointsCounter());
+ metricGroup.gauge(LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC, new LatestRestoredCheckpointTimestampGauge());
+ metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC, new LatestCompletedCheckpointSizeGauge());
+ metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC, new LatestCompletedCheckpointDurationGauge());
+ metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC, new LatestCompletedCheckpointAlignmentBufferedGauge());
+ metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC, new LatestCompletedCheckpointExternalPathGauge());
+ }
+
+ private class CheckpointsCounter implements Gauge<Long> {
+ @Override
+ public Long getValue() {
+ return counts.getTotalNumberOfCheckpoints();
+ }
+ }
+
+ private class InProgressCheckpointsCounter implements Gauge<Integer> {
+ @Override
+ public Integer getValue() {
+ return counts.getNumberOfInProgressCheckpoints();
+ }
+ }
+
+ private class CompletedCheckpointsCounter implements Gauge<Long> {
+ @Override
+ public Long getValue() {
+ return counts.getNumberOfCompletedCheckpoints();
+ }
+ }
+
+ private class FailedCheckpointsCounter implements Gauge<Long> {
+ @Override
+ public Long getValue() {
+ return counts.getNumberOfFailedCheckpoints();
+ }
+ }
+
+ private class LatestRestoredCheckpointTimestampGauge implements Gauge<Long> {
+ @Override
+ public Long getValue() {
+ RestoredCheckpointStats restored = latestRestoredCheckpoint;
+ if (restored != null) {
+ return restored.getRestoreTimestamp();
+ } else {
+ return -1L;
+ }
+ }
+ }
+
+ private class LatestCompletedCheckpointSizeGauge implements Gauge<Long> {
+ @Override
+ public Long getValue() {
+ CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+ if (completed != null) {
+ return completed.getStateSize();
+ } else {
+ return -1L;
+ }
+ }
+ }
+
+ private class LatestCompletedCheckpointDurationGauge implements Gauge<Long> {
+ @Override
+ public Long getValue() {
+ CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+ if (completed != null) {
+ return completed.getEndToEndDuration();
+ } else {
+ return -1L;
+ }
+ }
+ }
+
+
+ private class LatestCompletedCheckpointAlignmentBufferedGauge implements Gauge<Long> {
+ @Override
+ public Long getValue() {
+ CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+ if (completed != null) {
+ return completed.getAlignmentBuffered();
+ } else {
+ return -1L;
+ }
+ }
+ }
+
+ private class LatestCompletedCheckpointExternalPathGauge implements Gauge<String> {
+ @Override
+ public String getValue() {
+ CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+ if (completed != null) {
+ return completed.getExternalPath();
+ } else {
+ return "n/a";
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 0e70b1a..52f2a6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.StateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
@@ -62,6 +63,10 @@ public class CompletedCheckpoint implements Serializable {
/** External path if persisted checkpoint; <code>null</code> otherwise. */
private final String externalPath;
+ /** Optional stats tracker callback for discard. */
+ @Nullable
+ private transient CompletedCheckpointStats.DiscardCallback discardCallback;
+
// ------------------------------------------------------------------------
public CompletedCheckpoint(
@@ -160,6 +165,10 @@ public class CompletedCheckpoint implements Serializable {
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
} finally {
taskStates.clear();
+
+ if (discardCallback != null) {
+ discardCallback.notifyDiscardedCheckpoint();
+ }
}
}
@@ -185,6 +194,15 @@ public class CompletedCheckpoint implements Serializable {
return externalPath;
}
+ /**
+ * Sets the callback for tracking when this checkpoint is discarded.
+ *
+ * @param discardCallback Callback to call when the checkpoint is discarded.
+ */
+ void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback discardCallback) {
+ this.discardCallback = discardCallback;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
new file mode 100644
index 0000000..4d2d995
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Statistics for a successfully completed checkpoint.
+ *
+ * <p>The reported statistics are immutable except for the discarded flag, which
+ * is updated via the {@link DiscardCallback} and the {@link CompletedCheckpoint}
+ * after an instance of this class has been created.
+ */
+public class CompletedCheckpointStats extends AbstractCheckpointStats {
+
+ /** Callback for the {@link CompletedCheckpoint} instance to notify about discard. */
+ private final DiscardCallback discardCallback;
+
+ /** Total checkpoint state size over all subtasks. */
+ private final long stateSize;
+
+ /** Buffered bytes during alignment over all subtasks. */
+ private final long alignmentBuffered;
+
+ /** The latest acknowledged subtask stats. */
+ private final SubtaskStateStats latestAcknowledgedSubtask;
+
+ /** Optional external path if persisted externally. */
+ @Nullable
+ private final String externalPath;
+
+ /** Flag indicating whether the checkpoint was discarded. */
+ private volatile boolean discarded;
+
+ /**
+ * Creates a tracker for a {@link CompletedCheckpoint}.
+ *
+ * @param checkpointId ID of the checkpoint.
+ * @param triggerTimestamp Timestamp when the checkpoint was triggered.
+ * @param props Checkpoint properties of the checkpoint.
+ * @param totalSubtaskCount Total number of subtasks for the checkpoint.
+ * @param taskStats Task stats for each involved operator.
+ * @param numAcknowledgedSubtasks Number of acknowledged subtasks.
+ * @param stateSize Total checkpoint state size over all subtasks.
+ * @param alignmentBuffered Buffered bytes during alignment over all subtasks.
+ * @param latestAcknowledgedSubtask The latest acknowledged subtask stats.
+ * @param externalPath Optional external path if persisted externally.
+ */
+ CompletedCheckpointStats(
+ long checkpointId,
+ long triggerTimestamp,
+ CheckpointProperties props,
+ int totalSubtaskCount,
+ Map<JobVertexID, TaskStateStats> taskStats,
+ int numAcknowledgedSubtasks,
+ long stateSize,
+ long alignmentBuffered,
+ SubtaskStateStats latestAcknowledgedSubtask,
+ @Nullable String externalPath) {
+
+ super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
+ checkArgument(numAcknowledgedSubtasks == totalSubtaskCount, "Did not acknowledge all subtasks.");
+ checkArgument(stateSize >= 0, "Negative state size");
+ this.stateSize = stateSize;
+ this.alignmentBuffered = alignmentBuffered;
+ this.latestAcknowledgedSubtask = checkNotNull(latestAcknowledgedSubtask);
+ this.externalPath = externalPath;
+ this.discardCallback = new DiscardCallback();
+ }
+
+ @Override
+ public CheckpointStatsStatus getStatus() {
+ return CheckpointStatsStatus.COMPLETED;
+ }
+
+ @Override
+ public int getNumberOfAcknowledgedSubtasks() {
+ return numberOfSubtasks;
+ }
+
+ @Override
+ public long getStateSize() {
+ return stateSize;
+ }
+
+ @Override
+ public long getAlignmentBuffered() {
+ return alignmentBuffered;
+ }
+
+ @Override
+ @Nullable
+ public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
+ return latestAcknowledgedSubtask;
+ }
+
+ // ------------------------------------------------------------------------
+ // Completed checkpoint specific methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns the external path if this checkpoint was persisted externally.
+ *
+ * @return External path of this checkpoint or <code>null</code>.
+ */
+ @Nullable
+ public String getExternalPath() {
+ return externalPath;
+ }
+
+ /**
+ * Returns whether the checkpoint has been discarded.
+ *
+ * @return <code>true</code> if the checkpoint has been discarded, <code>false</code> otherwise.
+ */
+ public boolean isDiscarded() {
+ return discarded;
+ }
+
+ /**
+ * Returns the callback for the {@link CompletedCheckpoint}.
+ *
+ * @return Callback for the {@link CompletedCheckpoint}.
+ */
+ DiscardCallback getDiscardCallback() {
+ return discardCallback;
+ }
+
+ /**
+ * Callback for the {@link CompletedCheckpoint} instance to notify about
+ * disposal of the checkpoint (most commonly when the checkpoint has been
+ * subsumed by a newer one).
+ */
+ class DiscardCallback {
+
+ /**
+ * Updates the discarded flag of the checkpoint stats.
+ *
+ * <p>After this notification, {@link #isDiscarded()} will return
+ * <code>true</code>.
+ */
+ void notifyDiscardedCheckpoint() {
+ discarded = true;
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return "CompletedCheckpoint(id=" + getCheckpointId() + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummary.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummary.java
new file mode 100644
index 0000000..7e3f4b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummary.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Summary over <strong>all</strong> completed checkpoints.
+ */
+public class CompletedCheckpointStatsSummary implements Serializable {
+
+ private static final long serialVersionUID = 5784360461635814038L;
+
+ /** State size statistics for all completed checkpoints. */
+ private final MinMaxAvgStats stateSize;
+
+ /** Duration statistics for all completed checkpoints. */
+ private final MinMaxAvgStats duration;
+
+ /** Byte buffered during alignment for all completed checkpoints. */
+ private final MinMaxAvgStats alignmentBuffered;
+
+ CompletedCheckpointStatsSummary() {
+ this(new MinMaxAvgStats(), new MinMaxAvgStats(), new MinMaxAvgStats());
+ }
+
+ private CompletedCheckpointStatsSummary(
+ MinMaxAvgStats stateSize,
+ MinMaxAvgStats duration,
+ MinMaxAvgStats alignmentBuffered) {
+
+ this.stateSize = checkNotNull(stateSize);
+ this.duration = checkNotNull(duration);
+ this.alignmentBuffered = checkNotNull(alignmentBuffered);
+ }
+
+ /**
+ * Updates the summary with the given completed checkpoint.
+ *
+ * @param completed Completed checkpoint to update the summary with.
+ */
+ void updateSummary(CompletedCheckpointStats completed) {
+ stateSize.add(completed.getStateSize());
+ duration.add(completed.getEndToEndDuration());
+ alignmentBuffered.add(completed.getAlignmentBuffered());
+ }
+
+ /**
+ * Creates a snapshot of the current state.
+ *
+ * @return A snapshot of the current state.
+ */
+ CompletedCheckpointStatsSummary createSnapshot() {
+ return new CompletedCheckpointStatsSummary(
+ stateSize.createSnapshot(),
+ duration.createSnapshot(),
+ alignmentBuffered.createSnapshot());
+ }
+
+ /**
+ * Returns the summary stats for the state size of completed checkpoints.
+ *
+ * @return Summary stats for the state size.
+ */
+ public MinMaxAvgStats getStateSizeStats() {
+ return stateSize;
+ }
+
+ /**
+ * Returns the summary stats for the duration of completed checkpoints.
+ *
+ * @return Summary stats for the duration.
+ */
+ public MinMaxAvgStats getEndToEndDurationStats() {
+ return duration;
+ }
+
+ /**
+ * Returns the summary stats for the bytes buffered during alignment.
+ *
+ * <p>If no alignments are reported or happen (at least once mode), the
+ * returned stats are in their initial state.
+ *
+ * @return Summary stats for the bytes buffered during alignment.
+ */
+ public MinMaxAvgStats getAlignmentBufferedStats() {
+ return alignmentBuffered;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
new file mode 100644
index 0000000..83d7c3d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Statistics for a failed checkpoint.
+ *
+ * <p>The reported statistics are immutable.
+ */
+public class FailedCheckpointStats extends AbstractCheckpointStats {
+
+ /** Number of acknowledged tasks. */
+ private final int numAcknowledgedSubtasks;
+
+ /** Total checkpoint state size over all subtasks. */
+ private final long stateSize;
+
+ /** Buffered bytes during alignment over all subtasks. */
+ private final long alignmentBuffered;
+
+ /** Timestamp when the checkpoint was failed at the coordinator. */
+ private final long failureTimestamp;
+
+ /**
+ * The latest acknowledged subtask stats if any subtask was acknowledged
+ * before failing the checkpoint.
+ */
+ @Nullable
+ private final SubtaskStateStats latestAcknowledgedSubtask;
+
+ /** Optional failure message. */
+ @Nullable
+ private final String failureMsg;
+
+ /**
+ * Creates a tracker for a failed checkpoint.
+ *
+ * @param checkpointId ID of the checkpoint.
+ * @param triggerTimestamp Timestamp when the checkpoint was triggered.
+ * @param props Checkpoint properties of the checkpoint.
+ * @param totalSubtaskCount Total number of subtasks for the checkpoint.
+ * @param taskStats Task stats for each involved operator.
+ * @param numAcknowledgedSubtasks Number of acknowledged subtasks.
+ * @param stateSize Total checkpoint state size over all subtasks.
+ * @param alignmentBuffered Buffered bytes during alignment over all subtasks.
+ * @param failureTimestamp Timestamp when this checkpoint failed.
+ * @param latestAcknowledgedSubtask The latest acknowledged subtask stats or <code>null</code>.
+ * @param cause Cause of the checkpoint failure or <code>null</code>.
+ */
+ FailedCheckpointStats(
+ long checkpointId,
+ long triggerTimestamp,
+ CheckpointProperties props,
+ int totalSubtaskCount,
+ Map<JobVertexID, TaskStateStats> taskStats,
+ int numAcknowledgedSubtasks,
+ long stateSize,
+ long alignmentBuffered,
+ long failureTimestamp,
+ @Nullable SubtaskStateStats latestAcknowledgedSubtask,
+ @Nullable Throwable cause) {
+
+ super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
+ checkArgument(numAcknowledgedSubtasks >= 0, "Negative number of ACKs");
+ this.numAcknowledgedSubtasks = numAcknowledgedSubtasks;
+ checkArgument(stateSize >= 0, "Negative state size");
+ this.stateSize = stateSize;
+ this.alignmentBuffered = alignmentBuffered;
+ this.failureTimestamp = failureTimestamp;
+ this.latestAcknowledgedSubtask = latestAcknowledgedSubtask;
+ this.failureMsg = cause != null ? cause.getMessage() : null;
+ }
+
+ @Override
+ public CheckpointStatsStatus getStatus() {
+ return CheckpointStatsStatus.FAILED;
+ }
+
+ @Override
+ public int getNumberOfAcknowledgedSubtasks() {
+ return numAcknowledgedSubtasks;
+ }
+
+ @Override
+ public long getStateSize() {
+ return stateSize;
+ }
+
+ @Override
+ public long getAlignmentBuffered() {
+ return alignmentBuffered;
+ }
+
+ @Override
+ @Nullable
+ public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
+ return latestAcknowledgedSubtask;
+ }
+
+ /**
+ * Returns the end to end duration until the checkpoint failure.
+ */
+ @Override
+ public long getEndToEndDuration() {
+ return Math.max(0, failureTimestamp - triggerTimestamp);
+ }
+
+ // ------------------------------------------------------------------------
+ // Failed checkpoint specific methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns the timestamp when this checkpoint failed.
+ *
+ * @return Timestamp when the checkpoint failed.
+ */
+ public long getFailureTimestamp() {
+ return failureTimestamp;
+ }
+
+ /**
+ * Returns the failure message or <code>null</code> if no cause was provided.
+ *
+ * @return Failure message of the checkpoint failure or <code>null</code>.
+ */
+ @Nullable
+ public String getFailureMessage() {
+ return failureMsg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStats.java
new file mode 100644
index 0000000..9d4c116
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStats.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+/**
+ * Helper for keeping track of min/max/average summaries.
+ */
+public class MinMaxAvgStats implements Serializable {
+
+ private static final long serialVersionUID = 1769601903483446707L;
+
+ /** Current min value. */
+ private long min;
+
+ /** Current max value. */
+ private long max;
+
+ /** Sum of all added values. */
+ private long sum;
+
+ /** Count of added values. */
+ private long count;
+
+ MinMaxAvgStats() {
+ }
+
+ private MinMaxAvgStats(long min, long max, long sum, long count) {
+ this.min = min;
+ this.max = max;
+ this.sum = sum;
+ this.count = count;
+ }
+
+ /**
+ * Adds the value to the stats if it is >= 0.
+ *
+ * @param value Value to add for min/max/avg stats..
+ */
+ void add(long value) {
+ if (value >= 0) {
+ if (count > 0) {
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ } else {
+ min = value;
+ max = value;
+ }
+
+ count++;
+ sum += value;
+ }
+ }
+
+ /**
+ * Returns a snapshot of the current state.
+ *
+ * @return A snapshot of the current state.
+ */
+ MinMaxAvgStats createSnapshot() {
+ return new MinMaxAvgStats(min, max, sum, count);
+ }
+
+ /**
+ * Returns the minimum seen value.
+ *
+ * @return The current minimum value.
+ */
+ public long getMinimum() {
+ return min;
+ }
+
+ /**
+ * Returns the maximum seen value.
+ *
+ * @return The current maximum value.
+ */
+ public long getMaximum() {
+ return max;
+ }
+
+ /**
+ * Returns the sum of all seen values.
+ *
+ * @return Sum of all values.
+ */
+ public long getSum() {
+ return sum;
+ }
+
+ /**
+ * Returns the count of all seen values.
+ *
+ * @return Count of all values.
+ */
+ public long getCount() {
+ return count;
+ }
+
+ /**
+ * Calculates the average over all seen values.
+ *
+ * @return Average over all seen values.
+ */
+ public long getAverage() {
+ if (count == 0) {
+ return 0;
+ } else {
+ return sum / count;
+ }
+ }
+
+}