You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/10 08:48:46 UTC

[10/11] flink git commit: [FLINK-4410] [runtime] Rework checkpoint stats tracking

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index e7df5bc..1d97e12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -35,6 +35,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -72,9 +73,6 @@ public class PendingCheckpoint {
 	/** Set of acknowledged tasks */
 	private final Set<ExecutionAttemptID> acknowledgedTasks;
 
-	/** Flag indicating whether the checkpoint is triggered as part of periodic scheduling. */
-	private final boolean isPeriodic;
-
 	/**
 	 * The checkpoint properties. If the checkpoint should be persisted
 	 * externally, it happens in {@link #finalizeCheckpoint()}.
@@ -93,6 +91,10 @@ public class PendingCheckpoint {
 
 	private boolean discarded;
 
+	/** Optional stats tracker callback. */
+	@Nullable
+	private PendingCheckpointStats statsCallback;
+
 	// --------------------------------------------------------------------------------------------
 
 	public PendingCheckpoint(
@@ -100,7 +102,6 @@ public class PendingCheckpoint {
 			long checkpointId,
 			long checkpointTimestamp,
 			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
-			boolean isPeriodic,
 			CheckpointProperties props,
 			String targetDirectory,
 			Executor executor) {
@@ -108,7 +109,6 @@ public class PendingCheckpoint {
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;
 		this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
-		this.isPeriodic = isPeriodic;
 		this.taskStates = new HashMap<>();
 		this.props = checkNotNull(props);
 		this.targetDirectory = targetDirectory;
@@ -163,10 +163,6 @@ public class PendingCheckpoint {
 		return discarded;
 	}
 
-	boolean isPeriodic() {
-		return isPeriodic;
-	}
-
 	/**
 	 * Checks whether this checkpoint can be subsumed or whether it should always continue, regardless
 	 * of newer checkpoints in progress.
@@ -186,6 +182,15 @@ public class PendingCheckpoint {
 		return targetDirectory;
 	}
 
+	/**
+	 * Sets the callback for tracking this pending checkpoint.
+	 *
+	 * @param trackerCallback Callback for collecting subtask stats.
+	 */
+	void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) {
+		this.statsCallback = checkNotNull(trackerCallback);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Progress and Completion
 	// ------------------------------------------------------------------------
@@ -227,6 +232,13 @@ public class PendingCheckpoint {
 
 			onCompletionPromise.complete(completed);
 
+			if (statsCallback != null) {
+				// Finalize the statsCallback and give the completed checkpoint a
+				// callback for discards.
+				CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
+				completed.setDiscardCallback(discardCallback);
+			}
+
 			dispose(false);
 
 			return completed;
@@ -238,14 +250,15 @@ public class PendingCheckpoint {
 	 *
 	 * @param executionAttemptId of the acknowledged task
 	 * @param subtaskState of the acknowledged task
+	 * @param checkpointMetaData Checkpoint meta data
 	 * @return TaskAcknowledgeResult of the operation
 	 */
 	public TaskAcknowledgeResult acknowledgeTask(
 			ExecutionAttemptID executionAttemptId,
-			SubtaskState subtaskState) {
+			SubtaskState subtaskState,
+			CheckpointMetaData checkpointMetaData) {
 
 		synchronized (lock) {
-
 			if (discarded) {
 				return TaskAcknowledgeResult.DISCARDED;
 			}
@@ -262,10 +275,12 @@ public class PendingCheckpoint {
 				acknowledgedTasks.add(executionAttemptId);
 			}
 
-			if (null != subtaskState) {
+			JobVertexID jobVertexID = vertex.getJobvertexId();
+			int subtaskIndex = vertex.getParallelSubtaskIndex();
+			long ackTimestamp = System.currentTimeMillis();
 
-				JobVertexID jobVertexID = vertex.getJobvertexId();
-				int subtaskIndex = vertex.getParallelSubtaskIndex();
+			long stateSize = 0;
+			if (null != subtaskState) {
 				TaskState taskState = taskStates.get(jobVertexID);
 
 				if (null == taskState) {
@@ -292,14 +307,30 @@ public class PendingCheckpoint {
 					taskStates.put(jobVertexID, taskState);
 				}
 
-				long duration = System.currentTimeMillis() - checkpointTimestamp;
-				subtaskState.setDuration(duration);
-
 				taskState.putState(subtaskIndex, subtaskState);
+				stateSize = subtaskState.getStateSize();
 			}
 
 			++numAcknowledgedTasks;
 
+			if (statsCallback != null) {
+				CheckpointMetrics metrics = checkpointMetaData.getMetrics();
+
+				// Do this in millis because the web frontend works with them
+				long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
+
+				SubtaskStateStats subtaskStateStats = new SubtaskStateStats(
+					subtaskIndex,
+					ackTimestamp,
+					stateSize,
+					metrics.getSyncDurationMillis(),
+					metrics.getAsyncDurationMillis(),
+					metrics.getBytesBufferedInAlignment(),
+					alignmentDurationMillis);
+
+				statsCallback.reportSubtaskStats(jobVertexID, subtaskStateStats);
+			}
+
 			return TaskAcknowledgeResult.SUCCESS;
 		}
 	}
@@ -323,7 +354,9 @@ public class PendingCheckpoint {
 	 */
 	public void abortExpired() {
 		try {
-			onCompletionPromise.completeExceptionally(new Exception("Checkpoint expired before completing"));
+			Exception cause = new Exception("Checkpoint expired before completing");
+			onCompletionPromise.completeExceptionally(cause);
+			reportFailedCheckpoint(cause);
 		} finally {
 			dispose(true);
 		}
@@ -334,12 +367,12 @@ public class PendingCheckpoint {
 	 */
 	public void abortSubsumed() {
 		try {
-			if (props.forceCheckpoint()) {
-				onCompletionPromise.completeExceptionally(new Exception("Bug: forced checkpoints must never be subsumed"));
+			Exception cause = new Exception("Checkpoints has been subsumed");
+			onCompletionPromise.completeExceptionally(cause);
+			reportFailedCheckpoint(cause);
 
+			if (props.forceCheckpoint()) {
 				throw new IllegalStateException("Bug: forced checkpoints must never be subsumed");
-			} else {
-				onCompletionPromise.completeExceptionally(new Exception("Checkpoints has been subsumed"));
 			}
 		} finally {
 			dispose(true);
@@ -348,7 +381,9 @@ public class PendingCheckpoint {
 
 	public void abortDeclined() {
 		try {
-			onCompletionPromise.completeExceptionally(new Exception("Checkpoint was declined (tasks not ready)"));
+			Exception cause = new Exception("Checkpoint was declined (tasks not ready)");
+			onCompletionPromise.completeExceptionally(cause);
+			reportFailedCheckpoint(cause);
 		} finally {
 			dispose(true);
 		}
@@ -360,7 +395,9 @@ public class PendingCheckpoint {
 	 */
 	public void abortError(Throwable cause) {
 		try {
-			onCompletionPromise.completeExceptionally(new Exception("Checkpoint failed: " + cause.getMessage(), cause));
+			Exception failure = new Exception("Checkpoint failed: " + cause.getMessage(), cause);
+			onCompletionPromise.completeExceptionally(failure);
+			reportFailedCheckpoint(failure);
 		} finally {
 			dispose(true);
 		}
@@ -393,6 +430,18 @@ public class PendingCheckpoint {
 		}
 	}
 
+	/**
+	 * Reports a failed checkpoint with the given optional cause.
+	 *
+	 * @param cause The failure cause or <code>null</code>.
+	 */
+	private void reportFailedCheckpoint(Exception cause) {
+		if (statsCallback != null) {
+			long failureTimestamp = System.currentTimeMillis();
+			statsCallback.reportFailedCheckpoint(failureTimestamp, cause);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
new file mode 100644
index 0000000..e6fa80f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Statistics for a pending checkpoint that is still in progress.
+ *
+ * <p>This is the starting point for all checkpoint tracking. The life cycle
+ * of instances of this class is tightly coupled to a {@link PendingCheckpoint}
+ * instance, which forwards statistics about acknowledged subtasks
+ * via {@link #reportSubtaskStats(JobVertexID, SubtaskStateStats)}.
+ *
+ * <p>Depending on whether the {@link PendingCheckpoint} is finalized
+ * successfully or aborted, we replace ourselves with a {@link CompletedCheckpointStats}
+ * or {@link FailedCheckpointStats} and notify the {@link CheckpointStatsTracker}.
+ *
+ * <p>The statistics gathered here are all live updated.
+ */
+public class PendingCheckpointStats extends AbstractCheckpointStats {
+
+	/** Tracker callback when the pending checkpoint is finalized or aborted. */
+	private final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
+
+	/** The current number of acknowledged subtasks. */
+	private volatile int currentNumAcknowledgedSubtasks;
+
+	/** Current checkpoint state size over all collected subtasks. */
+	private volatile long currentStateSize;
+
+	/** Current buffered bytes during alignment over all collected subtasks. */
+	private volatile long currentAlignmentBuffered;
+
+	/** Stats of the latest acknowleged subtask. */
+	private volatile SubtaskStateStats latestAcknowledgedSubtask;
+
+	/**
+	 * Creates a tracker for a {@link PendingCheckpoint}.
+	 *
+	 * @param checkpointId ID of the checkpoint.
+	 * @param triggerTimestamp Timestamp when the checkpoint was triggered.
+	 * @param props Checkpoint properties of the checkpoint.
+	 * @param totalSubtaskCount Total number of subtasks for the checkpoint.
+	 * @param taskStats Task stats for each involved operator.
+	 * @param trackerCallback Callback for the {@link CheckpointStatsTracker}.
+	 */
+	PendingCheckpointStats(
+			long checkpointId,
+			long triggerTimestamp,
+			CheckpointProperties props,
+			int totalSubtaskCount,
+			Map<JobVertexID, TaskStateStats> taskStats,
+			CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) {
+
+		super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
+		this.trackerCallback = checkNotNull(trackerCallback);
+	}
+
+	@Override
+	public CheckpointStatsStatus getStatus() {
+		return CheckpointStatsStatus.IN_PROGRESS;
+	}
+
+	@Override
+	public int getNumberOfAcknowledgedSubtasks() {
+		return currentNumAcknowledgedSubtasks;
+	}
+
+	@Override
+	public long getStateSize() {
+		return currentStateSize;
+	}
+
+	@Override
+	public long getAlignmentBuffered() {
+		return currentAlignmentBuffered;
+	}
+
+	@Override
+	public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
+		return latestAcknowledgedSubtask;
+	}
+
+	// ------------------------------------------------------------------------
+	// Callbacks from the PendingCheckpoint instance
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Reports statistics for a single subtask.
+	 *
+	 * @param jobVertexId ID of the task/operator the subtask belongs to.
+	 * @param subtask The statistics for the subtask.
+	 * @return <code>true</code> if successfully reported or <code>false</code> otherwise.
+	 */
+	boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
+		TaskStateStats taskStateStats = taskStats.get(jobVertexId);
+
+		if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
+			currentNumAcknowledgedSubtasks++;
+			latestAcknowledgedSubtask = subtask;
+
+			currentStateSize += subtask.getStateSize();
+
+			long alignmentBuffered = subtask.getAlignmentBuffered();
+			if (alignmentBuffered > 0) {
+				currentAlignmentBuffered += alignmentBuffered;
+			}
+
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Reports a successfully completed pending checkpoint.
+	 *
+	 * @param externalPath Optional external storage path if checkpoint was externalized.
+	 * @return Callback for the {@link CompletedCheckpoint} instance to notify about disposal.
+	 */
+	CompletedCheckpointStats.DiscardCallback reportCompletedCheckpoint(@Nullable String externalPath) {
+		CompletedCheckpointStats completed = new CompletedCheckpointStats(
+			checkpointId,
+			triggerTimestamp,
+			props,
+			numberOfSubtasks,
+			new HashMap<>(taskStats),
+			currentNumAcknowledgedSubtasks,
+			currentStateSize,
+			currentAlignmentBuffered,
+			latestAcknowledgedSubtask,
+			externalPath);
+
+		trackerCallback.reportCompletedCheckpoint(completed);
+
+		return completed.getDiscardCallback();
+	}
+
+	/**
+	 * Reports a failed pending checkpoint.
+	 *
+	 * @param failureTimestamp Timestamp of the failure.
+	 * @param cause Optional cause of the failure.
+	 */
+	void reportFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) {
+		FailedCheckpointStats failed = new FailedCheckpointStats(
+			checkpointId,
+			triggerTimestamp,
+			props,
+			numberOfSubtasks,
+			new HashMap<>(taskStats),
+			currentNumAcknowledgedSubtasks,
+			currentStateSize,
+			currentAlignmentBuffered,
+			failureTimestamp,
+			latestAcknowledgedSubtask,
+			cause);
+
+		trackerCallback.reportFailedCheckpoint(failed);
+	}
+
+	@Override
+	public String toString() {
+		return "PendingCheckpoint(id=" + getCheckpointId() + ")";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
new file mode 100644
index 0000000..c21937a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Statistics for a restored checkpoint.
+ */
+public class RestoredCheckpointStats implements Serializable {
+
+	private static final long serialVersionUID = 2305815319666360821L;
+
+	/** ID of the restored checkpoint. */
+	private final long checkpointId;
+
+	/** Properties of the restored checkpoint. */
+	private final CheckpointProperties props;
+
+	/** Timestamp when the checkpoint was restored at the coordinator. */
+	private final long restoreTimestamp;
+
+	/** Optional external path. */
+	@Nullable
+	private final String externalPath;
+
+	/**
+	 * Creates a new restored checkpoint stats.
+	 *
+	 * @param checkpointId ID of the checkpoint.
+	 * @param props Checkpoint properties of the checkpoint.
+	 * @param restoreTimestamp Timestamp when the checkpoint was restored.
+	 * @param externalPath Optional external path if persisted externally.
+	 */
+	RestoredCheckpointStats(
+			long checkpointId,
+			CheckpointProperties props,
+			long restoreTimestamp,
+			String externalPath) {
+
+		this.checkpointId = checkpointId;
+		this.props = checkNotNull(props, "Checkpoint Properties");
+		this.restoreTimestamp = restoreTimestamp;
+		this.externalPath = externalPath;
+	}
+
+	/**
+	 * Returns the ID of this checkpoint.
+	 *
+	 * @return ID of this checkpoint.
+	 */
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	/**
+	 * Returns the properties of the restored checkpoint.
+	 *
+	 * @return Properties of the restored checkpoint.
+	 */
+	public CheckpointProperties getProperties() {
+		return props;
+	}
+
+	/**
+	 * Returns the timestamp when the checkpoint was restored.
+	 *
+	 * @return Timestamp when the checkpoint was restored.
+	 */
+	public long getRestoreTimestamp() {
+		return restoreTimestamp;
+	}
+
+	/**
+	 * Returns the external path if this checkpoint was persisted externally.
+	 *
+	 * @return External path of this checkpoint or <code>null</code>.
+	 */
+	@Nullable
+	public String getExternalPath() {
+		return externalPath;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index ca51e1a..1393e32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -70,39 +70,19 @@ public class SubtaskState implements StateObject {
 	 */
 	private final long stateSize;
 
-	/**
-	 * The duration of the checkpoint (ack timestamp - trigger timestamp).
-	 */
-	private long duration;
-
 	public SubtaskState(
 			ChainedStateHandle<StreamStateHandle> legacyOperatorState,
 			ChainedStateHandle<OperatorStateHandle> managedOperatorState,
 			ChainedStateHandle<OperatorStateHandle> rawOperatorState,
 			KeyGroupsStateHandle managedKeyedState,
 			KeyGroupsStateHandle rawKeyedState) {
-		this(legacyOperatorState,
-				managedOperatorState,
-				rawOperatorState,
-				managedKeyedState,
-				rawKeyedState,
-				0L);
-	}
-
-	public SubtaskState(
-			ChainedStateHandle<StreamStateHandle> legacyOperatorState,
-			ChainedStateHandle<OperatorStateHandle> managedOperatorState,
-			ChainedStateHandle<OperatorStateHandle> rawOperatorState,
-			KeyGroupsStateHandle managedKeyedState,
-			KeyGroupsStateHandle rawKeyedState,
-			long duration) {
 
 		this.legacyOperatorState = checkNotNull(legacyOperatorState, "State");
 		this.managedOperatorState = managedOperatorState;
 		this.rawOperatorState = rawOperatorState;
 		this.managedKeyedState = managedKeyedState;
 		this.rawKeyedState = rawKeyedState;
-		this.duration = duration;
+
 		try {
 			long calculateStateSize = getSizeNullSafe(legacyOperatorState);
 			calculateStateSize += getSizeNullSafe(managedOperatorState);
@@ -147,10 +127,6 @@ public class SubtaskState implements StateObject {
 		return stateSize;
 	}
 
-	public long getDuration() {
-		return duration;
-	}
-
 	@Override
 	public void discardState() throws Exception {
 		StateUtil.bestEffortDiscardAllStateObjects(
@@ -162,13 +138,8 @@ public class SubtaskState implements StateObject {
 						rawKeyedState));
 	}
 
-	public void setDuration(long duration) {
-		this.duration = duration;
-	}
-
 	// --------------------------------------------------------------------------------------------
 
-
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -183,9 +154,7 @@ public class SubtaskState implements StateObject {
 		if (stateSize != that.stateSize) {
 			return false;
 		}
-		if (duration != that.duration) {
-			return false;
-		}
+
 		if (legacyOperatorState != null ?
 				!legacyOperatorState.equals(that.legacyOperatorState)
 				: that.legacyOperatorState != null) {
@@ -220,7 +189,6 @@ public class SubtaskState implements StateObject {
 		result = 31 * result + (managedKeyedState != null ? managedKeyedState.hashCode() : 0);
 		result = 31 * result + (rawKeyedState != null ? rawKeyedState.hashCode() : 0);
 		result = 31 * result + (int) (stateSize ^ (stateSize >>> 32));
-		result = 31 * result + (int) (duration ^ (duration >>> 32));
 		return result;
 	}
 
@@ -233,7 +201,6 @@ public class SubtaskState implements StateObject {
 				", keyedStateFromBackend=" + managedKeyedState +
 				", keyedStateHandleFromStream=" + rawKeyedState +
 				", stateSize=" + stateSize +
-				", duration=" + duration +
 				'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
new file mode 100644
index 0000000..3a66032
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Statistics for a single subtask that is part of a checkpoint.
+ *
+ * <p>Collects data that is spread over different close places:
+ * {@link CheckpointMetaData},
+ * {@link SubtaskState}, and
+ * {@link PendingCheckpoint}.
+ *
+ * <p>This is the smallest immutable unit of the stats.
+ */
+public class SubtaskStateStats {
+	
+	/** Index of this sub task. */
+	private final int subtaskIndex;
+
+	/**
+	 * Timestamp when the ack from this sub task was received at the
+	 * coordinator.
+	 */
+	private final long ackTimestamp;
+
+	/** Size of the checkpointed state at this subtask. */
+	private final long stateSize;
+
+	/** Checkpoint duration at the operator (sync part) in milliseconds. */
+	private final long syncCheckpointDuration;
+
+	/** Checkpoint duration at the operator (async part) in milliseconds. */
+	private final long asyncCheckpointDuration;
+
+	/** Number of buffered bytes during alignment. */
+	private final long alignmentBuffered;
+
+	/** Alignment duration in . */
+	private final long alignmentDuration;
+
+	/**
+	 * Creates the stats for a single subtask.
+	 *
+	 * @param subtaskIndex Index of the subtask.
+	 * @param ackTimestamp Timestamp when the acknowledgement of this subtask was received at the coordinator.
+	 * @param stateSize Size of the checkpointed state at this subtask.
+	 * @param syncCheckpointDuration Checkpoint duration at the task (synchronous part)
+	 * @param asyncCheckpointDuration  Checkpoint duration at the task (asynchronous part)
+	 * @param alignmentBuffered Bytes buffered during stream alignment (for exactly-once only).
+	 * @param alignmentDuration Duration of the stream alignment (for exactly-once only).
+	 */
+	SubtaskStateStats(
+			int subtaskIndex,
+			long ackTimestamp,
+			long stateSize,
+			long syncCheckpointDuration,
+			long asyncCheckpointDuration,
+			long alignmentBuffered,
+			long alignmentDuration) {
+
+		checkArgument(subtaskIndex >= 0, "Negative subtask index");
+		this.subtaskIndex = subtaskIndex;
+		checkArgument(stateSize >= 0, "Negative state size");
+		this.stateSize = stateSize;
+		this.ackTimestamp = ackTimestamp;
+		this.syncCheckpointDuration = syncCheckpointDuration;
+		this.asyncCheckpointDuration = asyncCheckpointDuration;
+		this.alignmentBuffered = alignmentBuffered;
+		this.alignmentDuration = alignmentDuration;
+	}
+
+	/**
+	 * Returns the subtask index.
+	 *
+	 * @return Subtask index.
+	 */
+	public int getSubtaskIndex() {
+		return subtaskIndex;
+	}
+
+	/**
+	 * Returns the size of the checkpointed state at this subtask.
+	 *
+	 * @return Checkpoint state size of the sub task.
+	 */
+	public long getStateSize() {
+		return stateSize;
+	}
+
+	/**
+	 * Returns the timestamp when the acknowledgement of this subtask was
+	 * received at the coordinator.
+	 *
+	 * @return ACK timestamp at the coordinator.
+	 */
+	public long getAckTimestamp() {
+		return ackTimestamp;
+	}
+
+	/**
+	 * Computes the duration since the given trigger timestamp.
+	 *
+	 * <p>If the trigger timestamp is greater than the ACK timestamp, this
+	 * returns <code>0</code>.
+	 *
+	 * @param triggerTimestamp Trigger timestamp of the checkpoint.
+	 * @return Duration since the given trigger timestamp.
+	 */
+	public long getEndToEndDuration(long triggerTimestamp) {
+		return Math.max(0, ackTimestamp - triggerTimestamp);
+	}
+
+	/**
+	 * Returns the duration of the synchronous part of the checkpoint.
+	 *
+	 * <p>Can return <code>-1</code> if the runtime did not report this.
+	 *
+	 * @return Duration of the synchronous part of the checkpoint or <code>-1</code>.
+	 */
+	public long getSyncCheckpointDuration() {
+		return syncCheckpointDuration;
+	}
+
+	/**
+	 * Returns the duration of the asynchronous part of the checkpoint.
+	 *
+	 * <p>Can return <code>-1</code> if the runtime did not report this.
+	 *
+	 * @return Duration of the asynchronous part of the checkpoint or <code>-1</code>.
+	 */
+	public long getAsyncCheckpointDuration() {
+		return asyncCheckpointDuration;
+	}
+
+	/**
+	 * Returns the number of bytes buffered during stream alignment (for
+	 * exactly-once only).
+	 *
+	 * <p>Can return <code>-1</code> if the runtime did not report this.
+	 *
+	 * @return Number of bytes buffered during stream alignment or <code>-1</code>.
+	 */
+	public long getAlignmentBuffered() {
+		return alignmentBuffered;
+	}
+
+	/**
+	 * Returns the duration of the stream alignment (for exactly-once only).
+	 *
+	 * <p>Can return <code>-1</code> if the runtime did not report this.
+	 *
+	 * @return Duration of the stream alignment or <code>-1</code>.
+	 */
+	public long getAlignmentDuration() {
+		return alignmentDuration;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
new file mode 100644
index 0000000..fc118d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Statistics for a single task/operator that gathers all statistics of its
+ * subtasks and provides summary statistics about all subtasks.
+ */
+public class TaskStateStats {
+
+	/** ID of the task the stats belong to. */
+	private final JobVertexID jobVertexId;
+
+	/** Stats for each subtask */
+	private final SubtaskStateStats[] subtaskStats;
+
+	/** A summary of the subtask stats. */
+	private final TaskStateStatsSummary summaryStats = new TaskStateStatsSummary();
+
+	/** Number of acknowledged subtasks. */
+	private int numAcknowledgedSubtasks;
+
+	/** The latest acknowledged subtask stats. */
+	@Nullable
+	private SubtaskStateStats latestAckedSubtaskStats;
+
+	TaskStateStats(JobVertexID jobVertexId, int numSubtasks) {
+		this.jobVertexId = checkNotNull(jobVertexId, "JobVertexID");
+		checkArgument(numSubtasks > 0, "Number of subtasks <= 0");
+		this.subtaskStats = new SubtaskStateStats[numSubtasks];
+	}
+
+	/**
+	 * Hands in the stats for a subtask.
+	 *
+	 * @param subtask Stats for the sub task to hand in.
+	 */
+	boolean reportSubtaskStats(SubtaskStateStats subtask) {
+		checkNotNull(subtask, "Subtask stats");
+		int subtaskIndex = subtask.getSubtaskIndex();
+
+		if (subtaskIndex < 0 || subtaskIndex >= subtaskStats.length) {
+			return false;
+		}
+
+		if (subtaskStats[subtaskIndex] == null) {
+			subtaskStats[subtaskIndex] = subtask;
+
+			latestAckedSubtaskStats = subtask;
+			numAcknowledgedSubtasks++;
+
+			summaryStats.updateSummary(subtask);
+
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Returns the ID of the operator the statistics belong to.
+	 *
+	 * @return ID of the operator the statistics belong to.
+	 */
+	public JobVertexID getJobVertexId() {
+		return jobVertexId;
+	}
+
+	/**
+	 * Returns the number of subtasks.
+	 *
+	 * @return Number of subtasks.
+	 */
+	public int getNumberOfSubtasks() {
+		return subtaskStats.length;
+	}
+
+	/**
+	 * Returns the number of acknowledged subtasks.
+	 *
+	 * @return Number of acknowledged subtasks.
+	 */
+	public int getNumberOfAcknowledgedSubtasks() {
+		return numAcknowledgedSubtasks;
+	}
+
+	/**
+	 * Returns the latest acknowledged subtask stats or <code>null</code>
+	 * if none was acknowledged yet.
+	 *
+	 * @return The latest acknowledged subtask stats.
+	 */
+	@Nullable
+	public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
+		return latestAckedSubtaskStats;
+	}
+
+	/**
+	 * Returns the ack timestamp of the latest acknowledged subtask or
+	 * <code>-1</code> if none was acknowledged yet.
+	 *
+	 * @return Ack timestamp of the latest acknowledged subtask or <code>-1</code>.
+	 */
+	public long getLatestAckTimestamp() {
+		SubtaskStateStats subtask = latestAckedSubtaskStats;
+		if (subtask != null) {
+			return subtask.getAckTimestamp();
+		} else {
+			return -1;
+		}
+	}
+
+	/**
+	 * Returns the total checkpoint state size over all subtasks.
+	 *
+	 * @return Total checkpoint state size over all subtasks.
+	 */
+	public long getStateSize() {
+		return summaryStats.getStateSizeStats().getSum();
+	}
+
+	/**
+	 * Returns the total buffered bytes during alignment over all subtasks.
+	 *
+	 * <p>Can return <code>-1</code> if the runtime did not report this.
+	 *
+	 * @return Total buffered bytes during alignment over all subtasks.
+	 */
+	public long getAlignmentBuffered() {
+		return summaryStats.getAlignmentBufferedStats().getSum();
+	}
+
+	/**
+	 * Returns the duration of this checkpoint at the task/operator calculated
+	 * as the time since triggering until the latest acknowledged subtask
+	 * or <code>-1</code> if no subtask was acknowledged yet.
+	 *
+	 * @return Duration of this checkpoint at the task/operator or <code>-1</code> if no subtask was acknowledged yet.
+	 */
+	public long getEndToEndDuration(long triggerTimestamp) {
+		SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
+		if (subtask != null) {
+			return Math.max(0, subtask.getAckTimestamp() - triggerTimestamp);
+		} else {
+			return -1;
+		}
+	}
+
+	/**
+	 * Returns the stats for all subtasks.
+	 *
+	 * <p>Elements of the returned array are <code>null</code> if no stats are
+	 * available yet for the respective subtask.
+	 *
+	 * <p>Note: The returned array must not be modified.
+	 *
+	 * @return Array of subtask stats (elements are <code>null</code> if no stats available yet).
+	 */
+	public SubtaskStateStats[] getSubtaskStats() {
+		return subtaskStats;
+	}
+
+	/**
+	 * Returns the summary of the subtask stats.
+	 *
+	 * @return Summary of the subtask stats.
+	 */
+	public TaskStateStatsSummary getSummaryStats() {
+		return summaryStats;
+	}
+
+	/**
+	 * Summary of the subtask stats of a single task/operator.
+	 */
+	public static class TaskStateStatsSummary {
+
+		private MinMaxAvgStats stateSize = new MinMaxAvgStats();
+		private MinMaxAvgStats ackTimestamp = new MinMaxAvgStats();
+		private MinMaxAvgStats syncCheckpointDuration = new MinMaxAvgStats();
+		private MinMaxAvgStats asyncCheckpointDuration = new MinMaxAvgStats();
+		private MinMaxAvgStats alignmentBuffered = new MinMaxAvgStats();
+		private MinMaxAvgStats alignmentDuration = new MinMaxAvgStats();
+
+		/**
+		 * Updates the summary with the given subtask.
+		 *
+		 * @param subtaskStats Subtask stats to update the summary with.
+		 */
+		void updateSummary(SubtaskStateStats subtaskStats) {
+			stateSize.add(subtaskStats.getStateSize());
+			ackTimestamp.add(subtaskStats.getAckTimestamp());
+			syncCheckpointDuration.add(subtaskStats.getSyncCheckpointDuration());
+			asyncCheckpointDuration.add(subtaskStats.getAsyncCheckpointDuration());
+			alignmentBuffered.add(subtaskStats.getAlignmentBuffered());
+			alignmentDuration.add(subtaskStats.getAlignmentDuration());
+		}
+
+		/**
+		 * Returns the summary stats for the state size.
+		 *
+		 * @return Summary stats for the state size.
+		 */
+		public MinMaxAvgStats getStateSizeStats() {
+			return stateSize;
+		}
+
+		/**
+		 * Returns the summary stats for the ACK timestamps.
+		 *
+		 * @return Summary stats for the state size.
+		 */
+		public MinMaxAvgStats getAckTimestampStats() {
+			return ackTimestamp;
+		}
+
+		/**
+		 * Returns the summary stats for the sync checkpoint duration.
+		 *
+		 * @return Summary stats for the sync checkpoint duration.
+		 */
+		public MinMaxAvgStats getSyncCheckpointDurationStats() {
+			return syncCheckpointDuration;
+		}
+
+		/**
+		 * Returns the summary stats for the async checkpoint duration.
+		 *
+		 * @return Summary stats for the async checkpoint duration.
+		 */
+		public MinMaxAvgStats getAsyncCheckpointDurationStats() {
+			return asyncCheckpointDuration;
+		}
+
+		/**
+		 * Returns the summary stats for the buffered bytes during alignments.
+		 *
+		 * @return Summary stats for the buffered state size during alignment.
+		 */
+		public MinMaxAvgStats getAlignmentBufferedStats() {
+			return alignmentBuffered;
+		}
+
+		/**
+		 * Returns the summary stats for the alignment duration.
+		 *
+		 * @return Summary stats for the duration of the alignment.
+		 */
+		public MinMaxAvgStats getAlignmentDurationStats() {
+			return alignmentDuration;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 4d16c13..48324ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -126,7 +126,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 
 	private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
 
-		dos.writeLong(subtaskState.getDuration());
+		dos.writeLong(-1);
 
 		ChainedStateHandle<StreamStateHandle> nonPartitionableState = subtaskState.getLegacyOperatorState();
 
@@ -160,12 +160,11 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 
 		KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
 		serializeKeyGroupStateHandle(keyedStateStream, dos);
-
 	}
 
 	private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
-
-		long duration = dis.readLong();
+		// Duration field has been removed from SubtaskState
+		long ignoredDuration = dis.readLong();
 
 		int len = dis.readInt();
 		List<StreamStateHandle> nonPartitionableState = new ArrayList<>(len);
@@ -207,8 +206,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 				operatorStateBackendChain,
 				operatorStateStreamChain,
 				keyedStateBackend,
-				keyedStateStream,
-				duration);
+				keyedStateStream);
 	}
 
 	private static void serializeKeyGroupStateHandle(

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index e7fe1b0..3490dc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
index 92af0c8..43b5889 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
@@ -18,10 +18,8 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import scala.Option;
 
 /**
  * Common interface for the runtime {@link ExecutionJobVertex} and {@link ArchivedExecutionJobVertex}.
@@ -70,16 +68,10 @@ public interface AccessExecutionJobVertex {
 	ExecutionState getAggregateState();
 
 	/**
-	 * Returns the {@link OperatorCheckpointStats} for this job vertex.
-	 *
-	 * @return checkpoint stats for this job vertex.
-	 */
-	Option<OperatorCheckpointStats> getCheckpointStats();
-
-	/**
 	 * Returns the aggregated user-defined accumulators as strings.
 	 *
 	 * @return aggregated user-defined accumulators as strings.
 	 */
 	StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 0bd5319..440ecda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.SerializedValue;
@@ -77,7 +76,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	private final ArchivedExecutionConfig archivedExecutionConfig;
 	private final boolean isStoppable;
 	private final Map<String, SerializedValue<Object>> serializedUserAccumulators;
-	private final ArchivedCheckpointStatsTracker tracker;
+	private final CheckpointStatsTracker tracker;
 
 	public ArchivedExecutionGraph(
 		JobID jobID,
@@ -92,7 +91,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 		Map<String, SerializedValue<Object>> serializedUserAccumulators,
 		ArchivedExecutionConfig executionConfig,
 		boolean isStoppable,
-		ArchivedCheckpointStatsTracker tracker
+		CheckpointStatsTracker tracker
 	) {
 		this.jobID = jobID;
 		this.jobName = jobName;

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
index e30f45a..c744907 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -18,11 +18,8 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import scala.Option;
 
 import java.io.Serializable;
 
@@ -41,7 +38,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 
 	private final int maxParallelism;
 
-	private final Option<OperatorCheckpointStats> checkpointStats;
 	private final StringifiedAccumulatorResult[] archivedUserAccumulators;
 
 	public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
@@ -56,10 +52,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 		this.name = jobVertex.getJobVertex().getName();
 		this.parallelism = jobVertex.getParallelism();
 		this.maxParallelism = jobVertex.getMaxParallelism();
-		CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker();
-		checkpointStats = tracker != null
-			? tracker.getOperatorStats(this.id)
-			: Option.<OperatorCheckpointStats>empty();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -106,12 +98,8 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public Option<OperatorCheckpointStats> getCheckpointStats() {
-		return checkpointStats;
-	}
-
-	@Override
 	public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
 		return archivedUserAccumulators;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cbb4c7e..058872a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
@@ -33,16 +34,12 @@ import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -63,8 +60,6 @@ import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Option;
-
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -77,7 +72,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -369,7 +363,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			LOG.error("Error while shutting down checkpointer.");
 		}
 
-		checkpointStatsTracker = Objects.requireNonNull(statsTracker, "Checkpoint stats tracker");
+		checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
 
 		// create the coordinator that triggers and commits checkpoints and holds the state
 		checkpointCoordinator = new CheckpointCoordinator(
@@ -385,9 +379,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			checkpointIDCounter,
 			checkpointStore,
 			checkpointDir,
-			checkpointStatsTracker,
 			ioExecutor);
 
+		checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
+
 		// interval of max long value indicates disable periodic checkpoint,
 		// the CheckpointActivatorDeactivator should be created only if the interval is not max value
 		if (interval != Long.MAX_VALUE) {
@@ -1291,28 +1286,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	@Override
 	public ArchivedExecutionGraph archive() {
-		Map<JobVertexID, OperatorCheckpointStats> operatorStats = new HashMap<>();
 		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>();
 		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
 		for (ExecutionJobVertex task : verticesInCreationOrder) {
 			ArchivedExecutionJobVertex archivedTask = task.archive();
 			archivedVerticesInCreationOrder.add(archivedTask);
 			archivedTasks.put(task.getJobVertexId(), archivedTask);
-			Option<OperatorCheckpointStats> statsOption = task.getCheckpointStats();
-			if (statsOption.isDefined()) {
-				operatorStats.put(task.getJobVertexId(), statsOption.get());
-			}
 		}
 
-		Option<JobCheckpointStats> jobStats;
-		if (getCheckpointStatsTracker() == null) {
-			jobStats = Option.empty();
-		} else {
-			jobStats = getCheckpointStatsTracker().getJobStats();
-		}
-
-		ArchivedCheckpointStatsTracker statsTracker = new ArchivedCheckpointStatsTracker(jobStats, operatorStats);
-
 		Map<String, SerializedValue<Object>> serializedUserAccumulators;
 		try {
 			serializedUserAccumulators = getAccumulatorsSerialized();
@@ -1334,7 +1315,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			serializedUserAccumulators,
 			getArchivedExecutionConfig(),
 			isStoppable(),
-			statsTracker
-		);
+			getCheckpointStatsTracker());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index a1d7385..386f202 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -28,9 +28,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -176,24 +174,18 @@ public class ExecutionGraphBuilder {
 				throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
 			}
 
-			// Checkpoint stats tracker
-			boolean isStatsDisabled = jobManagerConfig.getBoolean(
-					ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
-					ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
+			// Maximum number of remembered checkpoints
+			int historySize = jobManagerConfig.getInteger(
+					ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+					ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
 
-			CheckpointStatsTracker checkpointStatsTracker;
-			if (isStatsDisabled) {
-				checkpointStatsTracker = new DisabledCheckpointStatsTracker();
-			}
-			else {
-				int historySize = jobManagerConfig.getInteger(
-						ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-						ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
-
-				checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, metrics);
-			}
+			CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
+					historySize,
+					ackVertices,
+					snapshotSettings,
+					metrics);
 
-			/** The default directory for externalized checkpoints. */
+			// The default directory for externalized checkpoints
 			String externalizedCheckpointsDir = jobManagerConfig.getString(
 					ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7f2545c..fbab572 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -30,8 +30,6 @@ import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -47,7 +45,6 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
-import scala.Option;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -289,16 +286,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		
 		return getAggregateJobVertexState(num, parallelism);
 	}
-	
-	@Override
-	public Option<OperatorCheckpointStats> getCheckpointStats() {
-		CheckpointStatsTracker tracker = getGraph().getCheckpointStatsTracker();
-		if (tracker == null) {
-			return Option.empty();
-		} else {
-			return tracker.getOperatorStats(getJobVertexId());
-		}
-	}
 
 	//---------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 7d6b36e..561ba89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -49,6 +49,15 @@ public class JobSnapshottingSettings implements java.io.Serializable {
 
 	/** Settings for externalized checkpoints. */
 	private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
+
+	/**
+	 * Flag indicating whether exactly once checkpoint mode has been configured.
+	 * If <code>false</code>, at least once mode has been configured. This is
+	 * not a necessary attribute, because the checkpointing mode is only relevant
+	 * for the stream tasks, but we expose it here to forward it to the web runtime
+	 * UI.
+	 */
+	private final boolean isExactlyOnce;
 	
 	public JobSnapshottingSettings(
 			List<JobVertexID> verticesToTrigger,
@@ -58,7 +67,8 @@ public class JobSnapshottingSettings implements java.io.Serializable {
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
-			ExternalizedCheckpointSettings externalizedCheckpointSettings) {
+			ExternalizedCheckpointSettings externalizedCheckpointSettings,
+			boolean isExactlyOnce) {
 
 		// sanity checks
 		if (checkpointInterval < 1 || checkpointTimeout < 1 ||
@@ -74,6 +84,7 @@ public class JobSnapshottingSettings implements java.io.Serializable {
 		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
 		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
 		this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
+		this.isExactlyOnce = isExactlyOnce;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -110,6 +121,10 @@ public class JobSnapshottingSettings implements java.io.Serializable {
 		return externalizedCheckpointSettings;
 	}
 
+	public boolean isExactlyOnce() {
+		return isExactlyOnce;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index c63bac5..7ec3efa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -37,7 +37,6 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 
 	private static final long serialVersionUID = -7606214777192401493L;
 
-
 	private final SubtaskState subtaskState;
 
 	private final CheckpointMetaData checkpointMetaData;
@@ -76,20 +75,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 		return subtaskState;
 	}
 
-	public long getSynchronousDurationMillis() {
-		return checkpointMetaData.getSyncDurationMillis();
-	}
-
-	public long getAsynchronousDurationMillis() {
-		return checkpointMetaData.getAsyncDurationMillis();
-	}
-
-	public long getBytesBufferedInAlignment() {
-		return checkpointMetaData.getBytesBufferedInAlignment();
-	}
-
-	public long getAlignmentDurationNanos() {
-		return checkpointMetaData.getAlignmentDurationNanos();
+	public CheckpointMetaData getCheckpointMetaData() {
+		return checkpointMetaData;
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 463c2ae..daacbfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -83,6 +82,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
@@ -131,7 +131,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			// nothing should be happening
@@ -185,7 +184,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			// nothing should be happening
@@ -237,7 +235,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			// nothing should be happening
@@ -290,7 +287,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -389,7 +385,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -507,7 +502,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -654,7 +648,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -789,7 +782,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -911,7 +903,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			// trigger a checkpoint, partially acknowledged
@@ -980,7 +971,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1046,7 +1036,6 @@ public class CheckpointCoordinatorTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1136,28 +1125,28 @@ public class CheckpointCoordinatorTest {
 			final AtomicInteger numCalls = new AtomicInteger();
 
 			final Execution execution = triggerVertex.getCurrentExecutionAttempt();
-			
+
 			doAnswer(new Answer<Void>() {
-				
+
 				private long lastId = -1;
 				private long lastTs = -1;
-				
+
 				@Override
 				public Void answer(InvocationOnMock invocation) throws Throwable {
 					long id = (Long) invocation.getArguments()[0];
 					long ts = (Long) invocation.getArguments()[1];
-					
+
 					assertTrue(id > lastId);
 					assertTrue(ts >= lastTs);
 					assertTrue(ts >= start);
-					
+
 					lastId = id;
 					lastTs = ts;
 					numCalls.incrementAndGet();
 					return null;
 				}
 			}).when(execution).triggerCheckpoint(anyLong(), anyLong());
-			
+
 			CheckpointCoordinator coord = new CheckpointCoordinator(
 				jid,
 				10,        // periodic interval is 10 ms
@@ -1171,22 +1160,21 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
-			
+
 			coord.startCheckpointScheduler();
-			
+
 			long timeout = System.currentTimeMillis() + 60000;
 			do {
 				Thread.sleep(20);
 			}
 			while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
 			assertTrue(numCalls.get() >= 5);
-			
+
 			coord.stopCheckpointScheduler();
-			
-			
+
+
 			// for 400 ms, no further calls may come.
 			// there may be the case that one trigger was fired and about to
 			// acquire the lock, such that after cancelling it will still do
@@ -1195,7 +1183,7 @@ public class CheckpointCoordinatorTest {
 			Thread.sleep(400);
 			assertTrue(numCallsSoFar == numCalls.get() ||
 					numCallsSoFar+1 == numCalls.get());
-			
+
 			// start another sequence of periodic scheduling
 			numCalls.set(0);
 			coord.startCheckpointScheduler();
@@ -1206,7 +1194,7 @@ public class CheckpointCoordinatorTest {
 			}
 			while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
 			assertTrue(numCalls.get() >= 5);
-			
+
 			coord.stopCheckpointScheduler();
 
 			// for 400 ms, no further calls may come
@@ -1264,7 +1252,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				"dummy-path",
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 		try {
@@ -1313,7 +1300,7 @@ public class CheckpointCoordinatorTest {
 	public void testMaxConcurrentAttempts5() {
 		testMaxConcurrentAttempts(5);
 	}
-	
+
 	@Test
 	public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
 		final JobID jid = new JobID();
@@ -1339,7 +1326,6 @@ public class CheckpointCoordinatorTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -1475,7 +1461,6 @@ public class CheckpointCoordinatorTest {
 			counter,
 			new StandaloneCompletedCheckpointStore(10),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
@@ -1579,7 +1564,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			coord.startCheckpointScheduler();
@@ -1594,15 +1578,15 @@ public class CheckpointCoordinatorTest {
 			}
 			while ((now = System.currentTimeMillis()) < minDuration ||
 					(numCalls.get() < maxConcurrentAttempts && now < timeout));
-			
+
 			assertEquals(maxConcurrentAttempts, numCalls.get());
-			
+
 			verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
 					.triggerCheckpoint(anyLong(), anyLong());
-			
+
 			// now, once we acknowledge one checkpoint, it should trigger the next one
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(1L, 0L)));
-			
+
 			// this should have immediately triggered a new checkpoint
 			now = System.currentTimeMillis();
 			timeout = now + 60000;
@@ -1612,11 +1596,11 @@ public class CheckpointCoordinatorTest {
 			while (numCalls.get() < maxConcurrentAttempts + 1 && now < timeout);
 
 			assertEquals(maxConcurrentAttempts + 1, numCalls.get());
-			
+
 			// no further checkpoints should happen
 			Thread.sleep(200);
 			assertEquals(maxConcurrentAttempts + 1, numCalls.get());
-			
+
 			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
@@ -1653,7 +1637,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			coord.startCheckpointScheduler();
@@ -1668,7 +1651,7 @@ public class CheckpointCoordinatorTest {
 			}
 			while ((now = System.currentTimeMillis()) < minDuration ||
 					(coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
-			
+
 			// validate that the pending checkpoints are there
 			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
 			assertNotNull(coord.getPendingCheckpoints().get(1L));
@@ -1684,14 +1667,14 @@ public class CheckpointCoordinatorTest {
 			do {
 				Thread.sleep(20);
 			}
-			while (coord.getPendingCheckpoints().get(4L) == null && 
+			while (coord.getPendingCheckpoints().get(4L) == null &&
 					System.currentTimeMillis() < newTimeout);
-			
+
 			// do the final check
 			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
 			assertNotNull(coord.getPendingCheckpoints().get(3L));
 			assertNotNull(coord.getPendingCheckpoints().get(4L));
-			
+
 			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
@@ -1699,7 +1682,7 @@ public class CheckpointCoordinatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testPeriodicSchedulingWithInactiveTasks() {
 		try {
@@ -1722,7 +1705,7 @@ public class CheckpointCoordinatorTest {
 							return currentState.get();
 						}
 					});
-			
+
 			CheckpointCoordinator coord = new CheckpointCoordinator(
 				jid,
 				10,        // periodic interval is 10 ms
@@ -1736,24 +1719,23 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
-			
+
 			coord.startCheckpointScheduler();
 
 			// no checkpoint should have started so far
 			Thread.sleep(200);
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			
+
 			// now move the state to RUNNING
 			currentState.set(ExecutionState.RUNNING);
-			
+
 			// the coordinator should start checkpointing now
 			final long timeout = System.currentTimeMillis() + 10000;
 			do {
 				Thread.sleep(20);
 			}
-			while (System.currentTimeMillis() < timeout && 
+			while (System.currentTimeMillis() < timeout &&
 					coord.getNumberOfPendingCheckpoints() == 0);
 
 			assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
@@ -1789,7 +1771,6 @@ public class CheckpointCoordinatorTest {
 			checkpointIDCounter,
 			new StandaloneCompletedCheckpointStore(2),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
@@ -1843,7 +1824,6 @@ public class CheckpointCoordinatorTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(2),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
@@ -1887,7 +1867,7 @@ public class CheckpointCoordinatorTest {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-		ExecutionVertex[] arrayExecutionVertices = 
+		ExecutionVertex[] arrayExecutionVertices =
 				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
@@ -1904,7 +1884,6 @@ public class CheckpointCoordinatorTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		// trigger the checkpoint
@@ -1922,7 +1901,7 @@ public class CheckpointCoordinatorTest {
 			ChainedStateHandle<OperatorStateHandle> partitionableState = generateChainedPartitionableStateHandle(jobVertexID1, index, 2, 8, false);
 			KeyGroupsStateHandle partitionedKeyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
 
-			SubtaskState checkpointStateHandles = new SubtaskState(nonPartitionedState, partitionableState, null, partitionedKeyGroupState, null, 0);
+			SubtaskState checkpointStateHandles = new SubtaskState(nonPartitionedState, partitionableState, null, partitionedKeyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -1936,7 +1915,7 @@ public class CheckpointCoordinatorTest {
 			ChainedStateHandle<StreamStateHandle> nonPartitionedState = generateStateForVertex(jobVertexID2, index);
 			ChainedStateHandle<OperatorStateHandle> partitionableState = generateChainedPartitionableStateHandle(jobVertexID2, index, 2, 8, false);
 			KeyGroupsStateHandle partitionedKeyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
-			SubtaskState checkpointStateHandles = new SubtaskState(nonPartitionedState, partitionableState, null, partitionedKeyGroupState, null, 0);
+			SubtaskState checkpointStateHandles = new SubtaskState(nonPartitionedState, partitionableState, null, partitionedKeyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2010,7 +1989,6 @@ public class CheckpointCoordinatorTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		// trigger the checkpoint
@@ -2026,7 +2004,7 @@ public class CheckpointCoordinatorTest {
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 			ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
-			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null, 0);
+			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2040,7 +2018,7 @@ public class CheckpointCoordinatorTest {
 		for (int index = 0; index < jobVertex2.getParallelism(); index++) {
 			ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID2, index);
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
-			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null, 0);
+			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2109,7 +2087,7 @@ public class CheckpointCoordinatorTest {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-		ExecutionVertex[] arrayExecutionVertices = 
+		ExecutionVertex[] arrayExecutionVertices =
 				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
@@ -2126,7 +2104,6 @@ public class CheckpointCoordinatorTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		// trigger the checkpoint
@@ -2146,7 +2123,7 @@ public class CheckpointCoordinatorTest {
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
 					jobVertexID1, keyGroupPartitions1.get(index), false);
 
-			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null, 0);
+			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2163,7 +2140,7 @@ public class CheckpointCoordinatorTest {
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
 					jobVertexID2, keyGroupPartitions2.get(index), false);
 
-			SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null, 0);
+			SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2245,7 +2222,7 @@ public class CheckpointCoordinatorTest {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-		ExecutionVertex[] arrayExecutionVertices = 
+		ExecutionVertex[] arrayExecutionVertices =
 				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
@@ -2262,7 +2239,6 @@ public class CheckpointCoordinatorTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		// trigger the checkpoint
@@ -2285,7 +2261,7 @@ public class CheckpointCoordinatorTest {
 			KeyGroupsStateHandle keyedStateRaw = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), true);
 
 
-			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw , 0);
+			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2307,7 +2283,7 @@ public class CheckpointCoordinatorTest {
 			expectedOpStatesRaw.add(opStateRaw);
 			SubtaskState checkpointStateHandles =
 					new SubtaskState(new ChainedStateHandle<>(
-							Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw, 0);
+							Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
 					jid,
 					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2394,7 +2370,6 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				"fake-directory",
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -2771,7 +2746,6 @@ public class CheckpointCoordinatorTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		// Periodic
@@ -2915,4 +2889,76 @@ public class CheckpointCoordinatorTest {
 		Assert.assertEquals(expectedTotalPartitions, actualTotalPartitions);
 		Assert.assertEquals(expected, actual);
 	}
+
+	/**
+	 * Tests that the pending checkpoint stats callbacks are created.
+	 */
+	@Test
+	public void testCheckpointStatsTrackerPendingCheckpointCallback() {
+		final long timestamp = System.currentTimeMillis();
+		ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			new JobID(),
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			Executors.directExecutor());
+
+		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+		coord.setCheckpointStatsTracker(tracker);
+
+		when(tracker.reportPendingCheckpoint(anyLong(), anyLong(), any(CheckpointProperties.class)))
+			.thenReturn(mock(PendingCheckpointStats.class));
+
+		// Trigger a checkpoint and verify callback
+		assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+		verify(tracker, times(1))
+			.reportPendingCheckpoint(eq(1L), eq(timestamp), eq(CheckpointProperties.forStandardCheckpoint()));
+	}
+
+	/**
+	 * Tests that the restore callbacks are called if registered.
+	 */
+	@Test
+	public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
+		ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());
+
+		StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
+		store.addCheckpoint(new CompletedCheckpoint(new JobID(), 0, 0, 0, Collections.<JobVertexID, TaskState>emptyMap()));
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			new JobID(),
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			new StandaloneCheckpointIDCounter(),
+			store,
+			null,
+			Executors.directExecutor());
+
+		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+		coord.setCheckpointStatsTracker(tracker);
+
+		assertTrue(coord.restoreLatestCheckpointedState(Collections.<JobVertexID, ExecutionJobVertex>emptyMap(), false, true));
+
+		verify(tracker, times(1))
+			.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index 11bddb9..fb3bd65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -85,4 +85,31 @@ public class CheckpointPropertiesTest {
 		assertFalse(props.discardOnJobFailed());
 		assertFalse(props.discardOnJobSuspended());
 	}
+
+	/**
+	 * Tests the isSavepoint utility works as expected.
+	 */
+	@Test
+	public void testIsSavepoint() throws Exception {
+		{
+			CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
+			assertFalse(CheckpointProperties.isSavepoint(props));
+		}
+
+		{
+			CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
+			assertFalse(CheckpointProperties.isSavepoint(props));
+		}
+
+		{
+			CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(false);
+			assertFalse(CheckpointProperties.isSavepoint(props));
+		}
+
+		{
+			CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+			assertTrue(CheckpointProperties.isSavepoint(props));
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7cea130..0e20ebc8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -109,7 +108,6 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			// create ourselves a checkpoint with state
@@ -119,7 +117,7 @@ public class CheckpointStateRestoreTest {
 			PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
 			final long checkpointId = pending.getCheckpointId();
 
-			SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null, 0L);
+			SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null);
 			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData, checkpointStateHandles));
@@ -185,7 +183,6 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				new DisabledCheckpointStatsTracker(),
 				Executors.directExecutor());
 
 			try {
@@ -241,7 +238,6 @@ public class CheckpointStateRestoreTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest