You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/10 08:48:46 UTC
[10/11] flink git commit: [FLINK-4410] [runtime] Rework checkpoint
stats tracking
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index e7df5bc..1d97e12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -35,6 +35,7 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
@@ -72,9 +73,6 @@ public class PendingCheckpoint {
/** Set of acknowledged tasks */
private final Set<ExecutionAttemptID> acknowledgedTasks;
- /** Flag indicating whether the checkpoint is triggered as part of periodic scheduling. */
- private final boolean isPeriodic;
-
/**
* The checkpoint properties. If the checkpoint should be persisted
* externally, it happens in {@link #finalizeCheckpoint()}.
@@ -93,6 +91,10 @@ public class PendingCheckpoint {
private boolean discarded;
+ /** Optional stats tracker callback. */
+ @Nullable
+ private PendingCheckpointStats statsCallback;
+
// --------------------------------------------------------------------------------------------
public PendingCheckpoint(
@@ -100,7 +102,6 @@ public class PendingCheckpoint {
long checkpointId,
long checkpointTimestamp,
Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
- boolean isPeriodic,
CheckpointProperties props,
String targetDirectory,
Executor executor) {
@@ -108,7 +109,6 @@ public class PendingCheckpoint {
this.checkpointId = checkpointId;
this.checkpointTimestamp = checkpointTimestamp;
this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
- this.isPeriodic = isPeriodic;
this.taskStates = new HashMap<>();
this.props = checkNotNull(props);
this.targetDirectory = targetDirectory;
@@ -163,10 +163,6 @@ public class PendingCheckpoint {
return discarded;
}
- boolean isPeriodic() {
- return isPeriodic;
- }
-
/**
* Checks whether this checkpoint can be subsumed or whether it should always continue, regardless
* of newer checkpoints in progress.
@@ -186,6 +182,15 @@ public class PendingCheckpoint {
return targetDirectory;
}
+ /**
+ * Sets the callback for tracking this pending checkpoint.
+ *
+ * @param trackerCallback Callback for collecting subtask stats.
+ */
+ void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) {
+ this.statsCallback = checkNotNull(trackerCallback);
+ }
+
// ------------------------------------------------------------------------
// Progress and Completion
// ------------------------------------------------------------------------
@@ -227,6 +232,13 @@ public class PendingCheckpoint {
onCompletionPromise.complete(completed);
+ if (statsCallback != null) {
+ // Finalize the statsCallback and give the completed checkpoint a
+ // callback for discards.
+ CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
+ completed.setDiscardCallback(discardCallback);
+ }
+
dispose(false);
return completed;
@@ -238,14 +250,15 @@ public class PendingCheckpoint {
*
* @param executionAttemptId of the acknowledged task
* @param subtaskState of the acknowledged task
+ * @param checkpointMetaData Checkpoint meta data
* @return TaskAcknowledgeResult of the operation
*/
public TaskAcknowledgeResult acknowledgeTask(
ExecutionAttemptID executionAttemptId,
- SubtaskState subtaskState) {
+ SubtaskState subtaskState,
+ CheckpointMetaData checkpointMetaData) {
synchronized (lock) {
-
if (discarded) {
return TaskAcknowledgeResult.DISCARDED;
}
@@ -262,10 +275,12 @@ public class PendingCheckpoint {
acknowledgedTasks.add(executionAttemptId);
}
- if (null != subtaskState) {
+ JobVertexID jobVertexID = vertex.getJobvertexId();
+ int subtaskIndex = vertex.getParallelSubtaskIndex();
+ long ackTimestamp = System.currentTimeMillis();
- JobVertexID jobVertexID = vertex.getJobvertexId();
- int subtaskIndex = vertex.getParallelSubtaskIndex();
+ long stateSize = 0;
+ if (null != subtaskState) {
TaskState taskState = taskStates.get(jobVertexID);
if (null == taskState) {
@@ -292,14 +307,30 @@ public class PendingCheckpoint {
taskStates.put(jobVertexID, taskState);
}
- long duration = System.currentTimeMillis() - checkpointTimestamp;
- subtaskState.setDuration(duration);
-
taskState.putState(subtaskIndex, subtaskState);
+ stateSize = subtaskState.getStateSize();
}
++numAcknowledgedTasks;
+ if (statsCallback != null) {
+ CheckpointMetrics metrics = checkpointMetaData.getMetrics();
+
+ // Do this in millis because the web frontend works with them
+ long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
+
+ SubtaskStateStats subtaskStateStats = new SubtaskStateStats(
+ subtaskIndex,
+ ackTimestamp,
+ stateSize,
+ metrics.getSyncDurationMillis(),
+ metrics.getAsyncDurationMillis(),
+ metrics.getBytesBufferedInAlignment(),
+ alignmentDurationMillis);
+
+ statsCallback.reportSubtaskStats(jobVertexID, subtaskStateStats);
+ }
+
return TaskAcknowledgeResult.SUCCESS;
}
}
@@ -323,7 +354,9 @@ public class PendingCheckpoint {
*/
public void abortExpired() {
try {
- onCompletionPromise.completeExceptionally(new Exception("Checkpoint expired before completing"));
+ Exception cause = new Exception("Checkpoint expired before completing");
+ onCompletionPromise.completeExceptionally(cause);
+ reportFailedCheckpoint(cause);
} finally {
dispose(true);
}
@@ -334,12 +367,12 @@ public class PendingCheckpoint {
*/
public void abortSubsumed() {
try {
- if (props.forceCheckpoint()) {
- onCompletionPromise.completeExceptionally(new Exception("Bug: forced checkpoints must never be subsumed"));
+ Exception cause = new Exception("Checkpoints has been subsumed");
+ onCompletionPromise.completeExceptionally(cause);
+ reportFailedCheckpoint(cause);
+ if (props.forceCheckpoint()) {
throw new IllegalStateException("Bug: forced checkpoints must never be subsumed");
- } else {
- onCompletionPromise.completeExceptionally(new Exception("Checkpoints has been subsumed"));
}
} finally {
dispose(true);
@@ -348,7 +381,9 @@ public class PendingCheckpoint {
public void abortDeclined() {
try {
- onCompletionPromise.completeExceptionally(new Exception("Checkpoint was declined (tasks not ready)"));
+ Exception cause = new Exception("Checkpoint was declined (tasks not ready)");
+ onCompletionPromise.completeExceptionally(cause);
+ reportFailedCheckpoint(cause);
} finally {
dispose(true);
}
@@ -360,7 +395,9 @@ public class PendingCheckpoint {
*/
public void abortError(Throwable cause) {
try {
- onCompletionPromise.completeExceptionally(new Exception("Checkpoint failed: " + cause.getMessage(), cause));
+ Exception failure = new Exception("Checkpoint failed: " + cause.getMessage(), cause);
+ onCompletionPromise.completeExceptionally(failure);
+ reportFailedCheckpoint(failure);
} finally {
dispose(true);
}
@@ -393,6 +430,18 @@ public class PendingCheckpoint {
}
}
+ /**
+ * Reports a failed checkpoint with the given optional cause.
+ *
+ * @param cause The failure cause or <code>null</code>.
+ */
+ private void reportFailedCheckpoint(Exception cause) {
+ if (statsCallback != null) {
+ long failureTimestamp = System.currentTimeMillis();
+ statsCallback.reportFailedCheckpoint(failureTimestamp, cause);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
new file mode 100644
index 0000000..e6fa80f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Statistics for a pending checkpoint that is still in progress.
+ *
+ * <p>This is the starting point for all checkpoint tracking. The life cycle
+ * of instances of this class is tightly coupled to a {@link PendingCheckpoint}
+ * instance, which forwards statistics about acknowledged subtasks
+ * via {@link #reportSubtaskStats(JobVertexID, SubtaskStateStats)}.
+ *
+ * <p>Depending on whether the {@link PendingCheckpoint} is finalized
+ * successfully or aborted, we replace ourselves with a {@link CompletedCheckpointStats}
+ * or {@link FailedCheckpointStats} and notify the {@link CheckpointStatsTracker}.
+ *
+ * <p>The statistics gathered here are all live updated.
+ */
+public class PendingCheckpointStats extends AbstractCheckpointStats {
+
+ /** Tracker callback when the pending checkpoint is finalized or aborted. */
+ private final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
+
+ /** The current number of acknowledged subtasks. */
+ private volatile int currentNumAcknowledgedSubtasks;
+
+ /** Current checkpoint state size over all collected subtasks. */
+ private volatile long currentStateSize;
+
+ /** Current buffered bytes during alignment over all collected subtasks. */
+ private volatile long currentAlignmentBuffered;
+
+ /** Stats of the latest acknowleged subtask. */
+ private volatile SubtaskStateStats latestAcknowledgedSubtask;
+
+ /**
+ * Creates a tracker for a {@link PendingCheckpoint}.
+ *
+ * @param checkpointId ID of the checkpoint.
+ * @param triggerTimestamp Timestamp when the checkpoint was triggered.
+ * @param props Checkpoint properties of the checkpoint.
+ * @param totalSubtaskCount Total number of subtasks for the checkpoint.
+ * @param taskStats Task stats for each involved operator.
+ * @param trackerCallback Callback for the {@link CheckpointStatsTracker}.
+ */
+ PendingCheckpointStats(
+ long checkpointId,
+ long triggerTimestamp,
+ CheckpointProperties props,
+ int totalSubtaskCount,
+ Map<JobVertexID, TaskStateStats> taskStats,
+ CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) {
+
+ super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
+ this.trackerCallback = checkNotNull(trackerCallback);
+ }
+
+ @Override
+ public CheckpointStatsStatus getStatus() {
+ return CheckpointStatsStatus.IN_PROGRESS;
+ }
+
+ @Override
+ public int getNumberOfAcknowledgedSubtasks() {
+ return currentNumAcknowledgedSubtasks;
+ }
+
+ @Override
+ public long getStateSize() {
+ return currentStateSize;
+ }
+
+ @Override
+ public long getAlignmentBuffered() {
+ return currentAlignmentBuffered;
+ }
+
+ @Override
+ public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
+ return latestAcknowledgedSubtask;
+ }
+
+ // ------------------------------------------------------------------------
+ // Callbacks from the PendingCheckpoint instance
+ // ------------------------------------------------------------------------
+
+ /**
+ * Reports statistics for a single subtask.
+ *
+ * @param jobVertexId ID of the task/operator the subtask belongs to.
+ * @param subtask The statistics for the subtask.
+ * @return <code>true</code> if successfully reported or <code>false</code> otherwise.
+ */
+ boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
+ TaskStateStats taskStateStats = taskStats.get(jobVertexId);
+
+ if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
+ currentNumAcknowledgedSubtasks++;
+ latestAcknowledgedSubtask = subtask;
+
+ currentStateSize += subtask.getStateSize();
+
+ long alignmentBuffered = subtask.getAlignmentBuffered();
+ if (alignmentBuffered > 0) {
+ currentAlignmentBuffered += alignmentBuffered;
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Reports a successfully completed pending checkpoint.
+ *
+ * @param externalPath Optional external storage path if checkpoint was externalized.
+ * @return Callback for the {@link CompletedCheckpoint} instance to notify about disposal.
+ */
+ CompletedCheckpointStats.DiscardCallback reportCompletedCheckpoint(@Nullable String externalPath) {
+ CompletedCheckpointStats completed = new CompletedCheckpointStats(
+ checkpointId,
+ triggerTimestamp,
+ props,
+ numberOfSubtasks,
+ new HashMap<>(taskStats),
+ currentNumAcknowledgedSubtasks,
+ currentStateSize,
+ currentAlignmentBuffered,
+ latestAcknowledgedSubtask,
+ externalPath);
+
+ trackerCallback.reportCompletedCheckpoint(completed);
+
+ return completed.getDiscardCallback();
+ }
+
+ /**
+ * Reports a failed pending checkpoint.
+ *
+ * @param failureTimestamp Timestamp of the failure.
+ * @param cause Optional cause of the failure.
+ */
+ void reportFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) {
+ FailedCheckpointStats failed = new FailedCheckpointStats(
+ checkpointId,
+ triggerTimestamp,
+ props,
+ numberOfSubtasks,
+ new HashMap<>(taskStats),
+ currentNumAcknowledgedSubtasks,
+ currentStateSize,
+ currentAlignmentBuffered,
+ failureTimestamp,
+ latestAcknowledgedSubtask,
+ cause);
+
+ trackerCallback.reportFailedCheckpoint(failed);
+ }
+
+ @Override
+ public String toString() {
+ return "PendingCheckpoint(id=" + getCheckpointId() + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
new file mode 100644
index 0000000..c21937a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Statistics for a restored checkpoint.
+ */
+public class RestoredCheckpointStats implements Serializable {
+
+ private static final long serialVersionUID = 2305815319666360821L;
+
+ /** ID of the restored checkpoint. */
+ private final long checkpointId;
+
+ /** Properties of the restored checkpoint. */
+ private final CheckpointProperties props;
+
+ /** Timestamp when the checkpoint was restored at the coordinator. */
+ private final long restoreTimestamp;
+
+ /** Optional external path. */
+ @Nullable
+ private final String externalPath;
+
+ /**
+ * Creates a new restored checkpoint stats.
+ *
+ * @param checkpointId ID of the checkpoint.
+ * @param props Checkpoint properties of the checkpoint.
+ * @param restoreTimestamp Timestamp when the checkpoint was restored.
+ * @param externalPath Optional external path if persisted externally.
+ */
+ RestoredCheckpointStats(
+ long checkpointId,
+ CheckpointProperties props,
+ long restoreTimestamp,
+ String externalPath) {
+
+ this.checkpointId = checkpointId;
+ this.props = checkNotNull(props, "Checkpoint Properties");
+ this.restoreTimestamp = restoreTimestamp;
+ this.externalPath = externalPath;
+ }
+
+ /**
+ * Returns the ID of this checkpoint.
+ *
+ * @return ID of this checkpoint.
+ */
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ /**
+ * Returns the properties of the restored checkpoint.
+ *
+ * @return Properties of the restored checkpoint.
+ */
+ public CheckpointProperties getProperties() {
+ return props;
+ }
+
+ /**
+ * Returns the timestamp when the checkpoint was restored.
+ *
+ * @return Timestamp when the checkpoint was restored.
+ */
+ public long getRestoreTimestamp() {
+ return restoreTimestamp;
+ }
+
+ /**
+ * Returns the external path if this checkpoint was persisted externally.
+ *
+ * @return External path of this checkpoint or <code>null</code>.
+ */
+ @Nullable
+ public String getExternalPath() {
+ return externalPath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index ca51e1a..1393e32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -70,39 +70,19 @@ public class SubtaskState implements StateObject {
*/
private final long stateSize;
- /**
- * The duration of the checkpoint (ack timestamp - trigger timestamp).
- */
- private long duration;
-
public SubtaskState(
ChainedStateHandle<StreamStateHandle> legacyOperatorState,
ChainedStateHandle<OperatorStateHandle> managedOperatorState,
ChainedStateHandle<OperatorStateHandle> rawOperatorState,
KeyGroupsStateHandle managedKeyedState,
KeyGroupsStateHandle rawKeyedState) {
- this(legacyOperatorState,
- managedOperatorState,
- rawOperatorState,
- managedKeyedState,
- rawKeyedState,
- 0L);
- }
-
- public SubtaskState(
- ChainedStateHandle<StreamStateHandle> legacyOperatorState,
- ChainedStateHandle<OperatorStateHandle> managedOperatorState,
- ChainedStateHandle<OperatorStateHandle> rawOperatorState,
- KeyGroupsStateHandle managedKeyedState,
- KeyGroupsStateHandle rawKeyedState,
- long duration) {
this.legacyOperatorState = checkNotNull(legacyOperatorState, "State");
this.managedOperatorState = managedOperatorState;
this.rawOperatorState = rawOperatorState;
this.managedKeyedState = managedKeyedState;
this.rawKeyedState = rawKeyedState;
- this.duration = duration;
+
try {
long calculateStateSize = getSizeNullSafe(legacyOperatorState);
calculateStateSize += getSizeNullSafe(managedOperatorState);
@@ -147,10 +127,6 @@ public class SubtaskState implements StateObject {
return stateSize;
}
- public long getDuration() {
- return duration;
- }
-
@Override
public void discardState() throws Exception {
StateUtil.bestEffortDiscardAllStateObjects(
@@ -162,13 +138,8 @@ public class SubtaskState implements StateObject {
rawKeyedState));
}
- public void setDuration(long duration) {
- this.duration = duration;
- }
-
// --------------------------------------------------------------------------------------------
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -183,9 +154,7 @@ public class SubtaskState implements StateObject {
if (stateSize != that.stateSize) {
return false;
}
- if (duration != that.duration) {
- return false;
- }
+
if (legacyOperatorState != null ?
!legacyOperatorState.equals(that.legacyOperatorState)
: that.legacyOperatorState != null) {
@@ -220,7 +189,6 @@ public class SubtaskState implements StateObject {
result = 31 * result + (managedKeyedState != null ? managedKeyedState.hashCode() : 0);
result = 31 * result + (rawKeyedState != null ? rawKeyedState.hashCode() : 0);
result = 31 * result + (int) (stateSize ^ (stateSize >>> 32));
- result = 31 * result + (int) (duration ^ (duration >>> 32));
return result;
}
@@ -233,7 +201,6 @@ public class SubtaskState implements StateObject {
", keyedStateFromBackend=" + managedKeyedState +
", keyedStateHandleFromStream=" + rawKeyedState +
", stateSize=" + stateSize +
- ", duration=" + duration +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
new file mode 100644
index 0000000..3a66032
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Statistics for a single subtask that is part of a checkpoint.
+ *
+ * <p>Collects data that is spread over different close places:
+ * {@link CheckpointMetaData},
+ * {@link SubtaskState}, and
+ * {@link PendingCheckpoint}.
+ *
+ * <p>This is the smallest immutable unit of the stats.
+ */
+public class SubtaskStateStats {
+
+ /** Index of this sub task. */
+ private final int subtaskIndex;
+
+ /**
+ * Timestamp when the ack from this sub task was received at the
+ * coordinator.
+ */
+ private final long ackTimestamp;
+
+ /** Size of the checkpointed state at this subtask. */
+ private final long stateSize;
+
+ /** Checkpoint duration at the operator (sync part) in milliseconds. */
+ private final long syncCheckpointDuration;
+
+ /** Checkpoint duration at the operator (async part) in milliseconds. */
+ private final long asyncCheckpointDuration;
+
+ /** Number of buffered bytes during alignment. */
+ private final long alignmentBuffered;
+
+ /** Alignment duration in . */
+ private final long alignmentDuration;
+
+ /**
+ * Creates the stats for a single subtask.
+ *
+ * @param subtaskIndex Index of the subtask.
+ * @param ackTimestamp Timestamp when the acknowledgement of this subtask was received at the coordinator.
+ * @param stateSize Size of the checkpointed state at this subtask.
+ * @param syncCheckpointDuration Checkpoint duration at the task (synchronous part)
+ * @param asyncCheckpointDuration Checkpoint duration at the task (asynchronous part)
+ * @param alignmentBuffered Bytes buffered during stream alignment (for exactly-once only).
+ * @param alignmentDuration Duration of the stream alignment (for exactly-once only).
+ */
+ SubtaskStateStats(
+ int subtaskIndex,
+ long ackTimestamp,
+ long stateSize,
+ long syncCheckpointDuration,
+ long asyncCheckpointDuration,
+ long alignmentBuffered,
+ long alignmentDuration) {
+
+ checkArgument(subtaskIndex >= 0, "Negative subtask index");
+ this.subtaskIndex = subtaskIndex;
+ checkArgument(stateSize >= 0, "Negative state size");
+ this.stateSize = stateSize;
+ this.ackTimestamp = ackTimestamp;
+ this.syncCheckpointDuration = syncCheckpointDuration;
+ this.asyncCheckpointDuration = asyncCheckpointDuration;
+ this.alignmentBuffered = alignmentBuffered;
+ this.alignmentDuration = alignmentDuration;
+ }
+
+ /**
+ * Returns the subtask index.
+ *
+ * @return Subtask index.
+ */
+ public int getSubtaskIndex() {
+ return subtaskIndex;
+ }
+
+ /**
+ * Returns the size of the checkpointed state at this subtask.
+ *
+ * @return Checkpoint state size of the sub task.
+ */
+ public long getStateSize() {
+ return stateSize;
+ }
+
+ /**
+ * Returns the timestamp when the acknowledgement of this subtask was
+ * received at the coordinator.
+ *
+ * @return ACK timestamp at the coordinator.
+ */
+ public long getAckTimestamp() {
+ return ackTimestamp;
+ }
+
+ /**
+ * Computes the duration since the given trigger timestamp.
+ *
+ * <p>If the trigger timestamp is greater than the ACK timestamp, this
+ * returns <code>0</code>.
+ *
+ * @param triggerTimestamp Trigger timestamp of the checkpoint.
+ * @return Duration since the given trigger timestamp.
+ */
+ public long getEndToEndDuration(long triggerTimestamp) {
+ return Math.max(0, ackTimestamp - triggerTimestamp);
+ }
+
+ /**
+ * Returns the duration of the synchronous part of the checkpoint.
+ *
+ * <p>Can return <code>-1</code> if the runtime did not report this.
+ *
+ * @return Duration of the synchronous part of the checkpoint or <code>-1</code>.
+ */
+ public long getSyncCheckpointDuration() {
+ return syncCheckpointDuration;
+ }
+
+ /**
+ * Returns the duration of the asynchronous part of the checkpoint.
+ *
+ * <p>Can return <code>-1</code> if the runtime did not report this.
+ *
+ * @return Duration of the asynchronous part of the checkpoint or <code>-1</code>.
+ */
+ public long getAsyncCheckpointDuration() {
+ return asyncCheckpointDuration;
+ }
+
+ /**
+ * Returns the number of bytes buffered during stream alignment (for
+ * exactly-once only).
+ *
+ * <p>Can return <code>-1</code> if the runtime did not report this.
+ *
+ * @return Number of bytes buffered during stream alignment or <code>-1</code>.
+ */
+ public long getAlignmentBuffered() {
+ return alignmentBuffered;
+ }
+
+ /**
+ * Returns the duration of the stream alignment (for exactly-once only).
+ *
+ * <p>Can return <code>-1</code> if the runtime did not report this.
+ *
+ * @return Duration of the stream alignment or <code>-1</code>.
+ */
+ public long getAlignmentDuration() {
+ return alignmentDuration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
new file mode 100644
index 0000000..fc118d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Statistics for a single task/operator that gathers all statistics of its
+ * subtasks and provides summary statistics about all subtasks.
+ */
+public class TaskStateStats {
+
+ /** ID of the task the stats belong to. */
+ private final JobVertexID jobVertexId;
+
+ /** Stats for each subtask */
+ private final SubtaskStateStats[] subtaskStats;
+
+ /** A summary of the subtask stats. */
+ private final TaskStateStatsSummary summaryStats = new TaskStateStatsSummary();
+
+ /** Number of acknowledged subtasks. */
+ private int numAcknowledgedSubtasks;
+
+ /** The latest acknowledged subtask stats. */
+ @Nullable
+ private SubtaskStateStats latestAckedSubtaskStats;
+
+ TaskStateStats(JobVertexID jobVertexId, int numSubtasks) {
+ this.jobVertexId = checkNotNull(jobVertexId, "JobVertexID");
+ checkArgument(numSubtasks > 0, "Number of subtasks <= 0");
+ this.subtaskStats = new SubtaskStateStats[numSubtasks];
+ }
+
+ /**
+ * Hands in the stats for a subtask.
+ *
+ * @param subtask Stats for the sub task to hand in.
+ */
+ boolean reportSubtaskStats(SubtaskStateStats subtask) {
+ checkNotNull(subtask, "Subtask stats");
+ int subtaskIndex = subtask.getSubtaskIndex();
+
+ if (subtaskIndex < 0 || subtaskIndex >= subtaskStats.length) {
+ return false;
+ }
+
+ if (subtaskStats[subtaskIndex] == null) {
+ subtaskStats[subtaskIndex] = subtask;
+
+ latestAckedSubtaskStats = subtask;
+ numAcknowledgedSubtasks++;
+
+ summaryStats.updateSummary(subtask);
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Returns the ID of the operator the statistics belong to.
+ *
+ * @return ID of the operator the statistics belong to.
+ */
+ public JobVertexID getJobVertexId() {
+ return jobVertexId;
+ }
+
+ /**
+ * Returns the number of subtasks.
+ *
+ * @return Number of subtasks.
+ */
+ public int getNumberOfSubtasks() {
+ return subtaskStats.length;
+ }
+
+ /**
+ * Returns the number of acknowledged subtasks.
+ *
+ * @return Number of acknowledged subtasks.
+ */
+ public int getNumberOfAcknowledgedSubtasks() {
+ return numAcknowledgedSubtasks;
+ }
+
+ /**
+ * Returns the latest acknowledged subtask stats or <code>null</code>
+ * if none was acknowledged yet.
+ *
+ * @return The latest acknowledged subtask stats.
+ */
+ @Nullable
+ public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
+ return latestAckedSubtaskStats;
+ }
+
+ /**
+ * Returns the ack timestamp of the latest acknowledged subtask or
+ * <code>-1</code> if none was acknowledged yet.
+ *
+ * @return Ack timestamp of the latest acknowledged subtask or <code>-1</code>.
+ */
+ public long getLatestAckTimestamp() {
+ SubtaskStateStats subtask = latestAckedSubtaskStats;
+ if (subtask != null) {
+ return subtask.getAckTimestamp();
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Returns the total checkpoint state size over all subtasks.
+ *
+ * @return Total checkpoint state size over all subtasks.
+ */
+ public long getStateSize() {
+ return summaryStats.getStateSizeStats().getSum();
+ }
+
+ /**
+ * Returns the total buffered bytes during alignment over all subtasks.
+ *
+ * <p>Can return <code>-1</code> if the runtime did not report this.
+ *
+ * @return Total buffered bytes during alignment over all subtasks.
+ */
+ public long getAlignmentBuffered() {
+ return summaryStats.getAlignmentBufferedStats().getSum();
+ }
+
+ /**
+ * Returns the duration of this checkpoint at the task/operator calculated
+ * as the time since triggering until the latest acknowledged subtask
+ * or <code>-1</code> if no subtask was acknowledged yet.
+ *
+ * @return Duration of this checkpoint at the task/operator or <code>-1</code> if no subtask was acknowledged yet.
+ */
+ public long getEndToEndDuration(long triggerTimestamp) {
+ SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
+ if (subtask != null) {
+ return Math.max(0, subtask.getAckTimestamp() - triggerTimestamp);
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Returns the stats for all subtasks.
+ *
+ * <p>Elements of the returned array are <code>null</code> if no stats are
+ * available yet for the respective subtask.
+ *
+ * <p>Note: The returned array must not be modified.
+ *
+ * @return Array of subtask stats (elements are <code>null</code> if no stats available yet).
+ */
+ public SubtaskStateStats[] getSubtaskStats() {
+ return subtaskStats;
+ }
+
+ /**
+ * Returns the summary of the subtask stats.
+ *
+ * @return Summary of the subtask stats.
+ */
+ public TaskStateStatsSummary getSummaryStats() {
+ return summaryStats;
+ }
+
+ /**
+ * Summary of the subtask stats of a single task/operator.
+ */
+ public static class TaskStateStatsSummary {
+
+ private MinMaxAvgStats stateSize = new MinMaxAvgStats();
+ private MinMaxAvgStats ackTimestamp = new MinMaxAvgStats();
+ private MinMaxAvgStats syncCheckpointDuration = new MinMaxAvgStats();
+ private MinMaxAvgStats asyncCheckpointDuration = new MinMaxAvgStats();
+ private MinMaxAvgStats alignmentBuffered = new MinMaxAvgStats();
+ private MinMaxAvgStats alignmentDuration = new MinMaxAvgStats();
+
+ /**
+ * Updates the summary with the given subtask.
+ *
+ * @param subtaskStats Subtask stats to update the summary with.
+ */
+ void updateSummary(SubtaskStateStats subtaskStats) {
+ stateSize.add(subtaskStats.getStateSize());
+ ackTimestamp.add(subtaskStats.getAckTimestamp());
+ syncCheckpointDuration.add(subtaskStats.getSyncCheckpointDuration());
+ asyncCheckpointDuration.add(subtaskStats.getAsyncCheckpointDuration());
+ alignmentBuffered.add(subtaskStats.getAlignmentBuffered());
+ alignmentDuration.add(subtaskStats.getAlignmentDuration());
+ }
+
+ /**
+ * Returns the summary stats for the state size.
+ *
+ * @return Summary stats for the state size.
+ */
+ public MinMaxAvgStats getStateSizeStats() {
+ return stateSize;
+ }
+
+ /**
+ * Returns the summary stats for the ACK timestamps.
+ *
+ * @return Summary stats for the state size.
+ */
+ public MinMaxAvgStats getAckTimestampStats() {
+ return ackTimestamp;
+ }
+
+ /**
+ * Returns the summary stats for the sync checkpoint duration.
+ *
+ * @return Summary stats for the sync checkpoint duration.
+ */
+ public MinMaxAvgStats getSyncCheckpointDurationStats() {
+ return syncCheckpointDuration;
+ }
+
+ /**
+ * Returns the summary stats for the async checkpoint duration.
+ *
+ * @return Summary stats for the async checkpoint duration.
+ */
+ public MinMaxAvgStats getAsyncCheckpointDurationStats() {
+ return asyncCheckpointDuration;
+ }
+
+ /**
+ * Returns the summary stats for the buffered bytes during alignments.
+ *
+ * @return Summary stats for the buffered state size during alignment.
+ */
+ public MinMaxAvgStats getAlignmentBufferedStats() {
+ return alignmentBuffered;
+ }
+
+ /**
+ * Returns the summary stats for the alignment duration.
+ *
+ * @return Summary stats for the duration of the alignment.
+ */
+ public MinMaxAvgStats getAlignmentDurationStats() {
+ return alignmentDuration;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 4d16c13..48324ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -126,7 +126,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
- dos.writeLong(subtaskState.getDuration());
+ dos.writeLong(-1);
ChainedStateHandle<StreamStateHandle> nonPartitionableState = subtaskState.getLegacyOperatorState();
@@ -160,12 +160,11 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
serializeKeyGroupStateHandle(keyedStateStream, dos);
-
}
private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
-
- long duration = dis.readLong();
+ // Duration field has been removed from SubtaskState
+ long ignoredDuration = dis.readLong();
int len = dis.readInt();
List<StreamStateHandle> nonPartitionableState = new ArrayList<>(len);
@@ -207,8 +206,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
operatorStateBackendChain,
operatorStateStreamChain,
keyedStateBackend,
- keyedStateStream,
- duration);
+ keyedStateStream);
}
private static void serializeKeyGroupStateHandle(
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index e7fe1b0..3490dc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
index 92af0c8..43b5889 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
@@ -18,10 +18,8 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import scala.Option;
/**
* Common interface for the runtime {@link ExecutionJobVertex} and {@link ArchivedExecutionJobVertex}.
@@ -70,16 +68,10 @@ public interface AccessExecutionJobVertex {
ExecutionState getAggregateState();
/**
- * Returns the {@link OperatorCheckpointStats} for this job vertex.
- *
- * @return checkpoint stats for this job vertex.
- */
- Option<OperatorCheckpointStats> getCheckpointStats();
-
- /**
* Returns the aggregated user-defined accumulators as strings.
*
* @return aggregated user-defined accumulators as strings.
*/
StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 0bd5319..440ecda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.SerializedValue;
@@ -77,7 +76,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
private final ArchivedExecutionConfig archivedExecutionConfig;
private final boolean isStoppable;
private final Map<String, SerializedValue<Object>> serializedUserAccumulators;
- private final ArchivedCheckpointStatsTracker tracker;
+ private final CheckpointStatsTracker tracker;
public ArchivedExecutionGraph(
JobID jobID,
@@ -92,7 +91,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
Map<String, SerializedValue<Object>> serializedUserAccumulators,
ArchivedExecutionConfig executionConfig,
boolean isStoppable,
- ArchivedCheckpointStatsTracker tracker
+ CheckpointStatsTracker tracker
) {
this.jobID = jobID;
this.jobName = jobName;
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
index e30f45a..c744907 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -18,11 +18,8 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import scala.Option;
import java.io.Serializable;
@@ -41,7 +38,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
private final int maxParallelism;
- private final Option<OperatorCheckpointStats> checkpointStats;
private final StringifiedAccumulatorResult[] archivedUserAccumulators;
public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
@@ -56,10 +52,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
this.name = jobVertex.getJobVertex().getName();
this.parallelism = jobVertex.getParallelism();
this.maxParallelism = jobVertex.getMaxParallelism();
- CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker();
- checkpointStats = tracker != null
- ? tracker.getOperatorStats(this.id)
- : Option.<OperatorCheckpointStats>empty();
}
// --------------------------------------------------------------------------------------------
@@ -106,12 +98,8 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
// --------------------------------------------------------------------------------------------
@Override
- public Option<OperatorCheckpointStats> getCheckpointStats() {
- return checkpointStats;
- }
-
- @Override
public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
return archivedUserAccumulators;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cbb4c7e..058872a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
@@ -33,16 +34,12 @@ import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.api.common.Archiveable;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -63,8 +60,6 @@ import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
@@ -77,7 +72,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -369,7 +363,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
LOG.error("Error while shutting down checkpointer.");
}
- checkpointStatsTracker = Objects.requireNonNull(statsTracker, "Checkpoint stats tracker");
+ checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator = new CheckpointCoordinator(
@@ -385,9 +379,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
checkpointIDCounter,
checkpointStore,
checkpointDir,
- checkpointStatsTracker,
ioExecutor);
+ checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
+
// interval of max long value indicates disable periodic checkpoint,
// the CheckpointActivatorDeactivator should be created only if the interval is not max value
if (interval != Long.MAX_VALUE) {
@@ -1291,28 +1286,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
@Override
public ArchivedExecutionGraph archive() {
- Map<JobVertexID, OperatorCheckpointStats> operatorStats = new HashMap<>();
Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>();
List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
for (ExecutionJobVertex task : verticesInCreationOrder) {
ArchivedExecutionJobVertex archivedTask = task.archive();
archivedVerticesInCreationOrder.add(archivedTask);
archivedTasks.put(task.getJobVertexId(), archivedTask);
- Option<OperatorCheckpointStats> statsOption = task.getCheckpointStats();
- if (statsOption.isDefined()) {
- operatorStats.put(task.getJobVertexId(), statsOption.get());
- }
}
- Option<JobCheckpointStats> jobStats;
- if (getCheckpointStatsTracker() == null) {
- jobStats = Option.empty();
- } else {
- jobStats = getCheckpointStatsTracker().getJobStats();
- }
-
- ArchivedCheckpointStatsTracker statsTracker = new ArchivedCheckpointStatsTracker(jobStats, operatorStats);
-
Map<String, SerializedValue<Object>> serializedUserAccumulators;
try {
serializedUserAccumulators = getAccumulatorsSerialized();
@@ -1334,7 +1315,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
serializedUserAccumulators,
getArchivedExecutionConfig(),
isStoppable(),
- statsTracker
- );
+ getCheckpointStatsTracker());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index a1d7385..386f202 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -28,9 +28,7 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -176,24 +174,18 @@ public class ExecutionGraphBuilder {
throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
}
- // Checkpoint stats tracker
- boolean isStatsDisabled = jobManagerConfig.getBoolean(
- ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
- ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
+ // Maximum number of remembered checkpoints
+ int historySize = jobManagerConfig.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
- CheckpointStatsTracker checkpointStatsTracker;
- if (isStatsDisabled) {
- checkpointStatsTracker = new DisabledCheckpointStatsTracker();
- }
- else {
- int historySize = jobManagerConfig.getInteger(
- ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
- ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
-
- checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, metrics);
- }
+ CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
+ historySize,
+ ackVertices,
+ snapshotSettings,
+ metrics);
- /** The default directory for externalized checkpoints. */
+ // The default directory for externalized checkpoints
String externalizedCheckpointsDir = jobManagerConfig.getString(
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7f2545c..fbab572 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -30,8 +30,6 @@ import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -47,7 +45,6 @@ import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
-import scala.Option;
import java.io.IOException;
import java.util.ArrayList;
@@ -289,16 +286,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return getAggregateJobVertexState(num, parallelism);
}
-
- @Override
- public Option<OperatorCheckpointStats> getCheckpointStats() {
- CheckpointStatsTracker tracker = getGraph().getCheckpointStatsTracker();
- if (tracker == null) {
- return Option.empty();
- } else {
- return tracker.getOperatorStats(getJobVertexId());
- }
- }
//---------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 7d6b36e..561ba89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -49,6 +49,15 @@ public class JobSnapshottingSettings implements java.io.Serializable {
/** Settings for externalized checkpoints. */
private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
+
+ /**
+ * Flag indicating whether exactly once checkpoint mode has been configured.
+ * If <code>false</code>, at least once mode has been configured. This is
+ * not a necessary attribute, because the checkpointing mode is only relevant
+ * for the stream tasks, but we expose it here to forward it to the web runtime
+ * UI.
+ */
+ private final boolean isExactlyOnce;
public JobSnapshottingSettings(
List<JobVertexID> verticesToTrigger,
@@ -58,7 +67,8 @@ public class JobSnapshottingSettings implements java.io.Serializable {
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
- ExternalizedCheckpointSettings externalizedCheckpointSettings) {
+ ExternalizedCheckpointSettings externalizedCheckpointSettings,
+ boolean isExactlyOnce) {
// sanity checks
if (checkpointInterval < 1 || checkpointTimeout < 1 ||
@@ -74,6 +84,7 @@ public class JobSnapshottingSettings implements java.io.Serializable {
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
+ this.isExactlyOnce = isExactlyOnce;
}
// --------------------------------------------------------------------------------------------
@@ -110,6 +121,10 @@ public class JobSnapshottingSettings implements java.io.Serializable {
return externalizedCheckpointSettings;
}
+ public boolean isExactlyOnce() {
+ return isExactlyOnce;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index c63bac5..7ec3efa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -37,7 +37,6 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
private static final long serialVersionUID = -7606214777192401493L;
-
private final SubtaskState subtaskState;
private final CheckpointMetaData checkpointMetaData;
@@ -76,20 +75,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
return subtaskState;
}
- public long getSynchronousDurationMillis() {
- return checkpointMetaData.getSyncDurationMillis();
- }
-
- public long getAsynchronousDurationMillis() {
- return checkpointMetaData.getAsyncDurationMillis();
- }
-
- public long getBytesBufferedInAlignment() {
- return checkpointMetaData.getBytesBufferedInAlignment();
- }
-
- public long getAlignmentDurationNanos() {
- return checkpointMetaData.getAlignmentDurationNanos();
+ public CheckpointMetaData getCheckpointMetaData() {
+ return checkpointMetaData;
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 463c2ae..daacbfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -83,6 +82,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
@@ -131,7 +131,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// nothing should be happening
@@ -185,7 +184,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// nothing should be happening
@@ -237,7 +235,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// nothing should be happening
@@ -290,7 +287,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -389,7 +385,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -507,7 +502,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -654,7 +648,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -789,7 +782,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -911,7 +903,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// trigger a checkpoint, partially acknowledged
@@ -980,7 +971,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1046,7 +1036,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1136,28 +1125,28 @@ public class CheckpointCoordinatorTest {
final AtomicInteger numCalls = new AtomicInteger();
final Execution execution = triggerVertex.getCurrentExecutionAttempt();
-
+
doAnswer(new Answer<Void>() {
-
+
private long lastId = -1;
private long lastTs = -1;
-
+
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
long id = (Long) invocation.getArguments()[0];
long ts = (Long) invocation.getArguments()[1];
-
+
assertTrue(id > lastId);
assertTrue(ts >= lastTs);
assertTrue(ts >= start);
-
+
lastId = id;
lastTs = ts;
numCalls.incrementAndGet();
return null;
}
}).when(execution).triggerCheckpoint(anyLong(), anyLong());
-
+
CheckpointCoordinator coord = new CheckpointCoordinator(
jid,
10, // periodic interval is 10 ms
@@ -1171,22 +1160,21 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
-
+
coord.startCheckpointScheduler();
-
+
long timeout = System.currentTimeMillis() + 60000;
do {
Thread.sleep(20);
}
while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
assertTrue(numCalls.get() >= 5);
-
+
coord.stopCheckpointScheduler();
-
-
+
+
// for 400 ms, no further calls may come.
// there may be the case that one trigger was fired and about to
// acquire the lock, such that after cancelling it will still do
@@ -1195,7 +1183,7 @@ public class CheckpointCoordinatorTest {
Thread.sleep(400);
assertTrue(numCallsSoFar == numCalls.get() ||
numCallsSoFar+1 == numCalls.get());
-
+
// start another sequence of periodic scheduling
numCalls.set(0);
coord.startCheckpointScheduler();
@@ -1206,7 +1194,7 @@ public class CheckpointCoordinatorTest {
}
while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
assertTrue(numCalls.get() >= 5);
-
+
coord.stopCheckpointScheduler();
// for 400 ms, no further calls may come
@@ -1264,7 +1252,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
"dummy-path",
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
try {
@@ -1313,7 +1300,7 @@ public class CheckpointCoordinatorTest {
public void testMaxConcurrentAttempts5() {
testMaxConcurrentAttempts(5);
}
-
+
@Test
public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
final JobID jid = new JobID();
@@ -1339,7 +1326,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -1475,7 +1461,6 @@ public class CheckpointCoordinatorTest {
counter,
new StandaloneCompletedCheckpointStore(10),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
@@ -1579,7 +1564,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
coord.startCheckpointScheduler();
@@ -1594,15 +1578,15 @@ public class CheckpointCoordinatorTest {
}
while ((now = System.currentTimeMillis()) < minDuration ||
(numCalls.get() < maxConcurrentAttempts && now < timeout));
-
+
assertEquals(maxConcurrentAttempts, numCalls.get());
-
+
verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
.triggerCheckpoint(anyLong(), anyLong());
-
+
// now, once we acknowledge one checkpoint, it should trigger the next one
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, new CheckpointMetaData(1L, 0L)));
-
+
// this should have immediately triggered a new checkpoint
now = System.currentTimeMillis();
timeout = now + 60000;
@@ -1612,11 +1596,11 @@ public class CheckpointCoordinatorTest {
while (numCalls.get() < maxConcurrentAttempts + 1 && now < timeout);
assertEquals(maxConcurrentAttempts + 1, numCalls.get());
-
+
// no further checkpoints should happen
Thread.sleep(200);
assertEquals(maxConcurrentAttempts + 1, numCalls.get());
-
+
coord.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
@@ -1653,7 +1637,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
coord.startCheckpointScheduler();
@@ -1668,7 +1651,7 @@ public class CheckpointCoordinatorTest {
}
while ((now = System.currentTimeMillis()) < minDuration ||
(coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
-
+
// validate that the pending checkpoints are there
assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
assertNotNull(coord.getPendingCheckpoints().get(1L));
@@ -1684,14 +1667,14 @@ public class CheckpointCoordinatorTest {
do {
Thread.sleep(20);
}
- while (coord.getPendingCheckpoints().get(4L) == null &&
+ while (coord.getPendingCheckpoints().get(4L) == null &&
System.currentTimeMillis() < newTimeout);
-
+
// do the final check
assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
assertNotNull(coord.getPendingCheckpoints().get(3L));
assertNotNull(coord.getPendingCheckpoints().get(4L));
-
+
coord.shutdown(JobStatus.FINISHED);
}
catch (Exception e) {
@@ -1699,7 +1682,7 @@ public class CheckpointCoordinatorTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testPeriodicSchedulingWithInactiveTasks() {
try {
@@ -1722,7 +1705,7 @@ public class CheckpointCoordinatorTest {
return currentState.get();
}
});
-
+
CheckpointCoordinator coord = new CheckpointCoordinator(
jid,
10, // periodic interval is 10 ms
@@ -1736,24 +1719,23 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
-
+
coord.startCheckpointScheduler();
// no checkpoint should have started so far
Thread.sleep(200);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
-
+
// now move the state to RUNNING
currentState.set(ExecutionState.RUNNING);
-
+
// the coordinator should start checkpointing now
final long timeout = System.currentTimeMillis() + 10000;
do {
Thread.sleep(20);
}
- while (System.currentTimeMillis() < timeout &&
+ while (System.currentTimeMillis() < timeout &&
coord.getNumberOfPendingCheckpoints() == 0);
assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
@@ -1789,7 +1771,6 @@ public class CheckpointCoordinatorTest {
checkpointIDCounter,
new StandaloneCompletedCheckpointStore(2),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
@@ -1843,7 +1824,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
@@ -1887,7 +1867,7 @@ public class CheckpointCoordinatorTest {
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
- ExecutionVertex[] arrayExecutionVertices =
+ ExecutionVertex[] arrayExecutionVertices =
allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
@@ -1904,7 +1884,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// trigger the checkpoint
@@ -1922,7 +1901,7 @@ public class CheckpointCoordinatorTest {
ChainedStateHandle<OperatorStateHandle> partitionableState = generateChainedPartitionableStateHandle(jobVertexID1, index, 2, 8, false);
KeyGroupsStateHandle partitionedKeyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
- SubtaskState checkpointStateHandles = new SubtaskState(nonPartitionedState, partitionableState, null, partitionedKeyGroupState, null, 0);
+ SubtaskState checkpointStateHandles = new SubtaskState(nonPartitionedState, partitionableState, null, partitionedKeyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -1936,7 +1915,7 @@ public class CheckpointCoordinatorTest {
ChainedStateHandle<StreamStateHandle> nonPartitionedState = generateStateForVertex(jobVertexID2, index);
ChainedStateHandle<OperatorStateHandle> partitionableState = generateChainedPartitionableStateHandle(jobVertexID2, index, 2, 8, false);
KeyGroupsStateHandle partitionedKeyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
- SubtaskState checkpointStateHandles = new SubtaskState(nonPartitionedState, partitionableState, null, partitionedKeyGroupState, null, 0);
+ SubtaskState checkpointStateHandles = new SubtaskState(nonPartitionedState, partitionableState, null, partitionedKeyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2010,7 +1989,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// trigger the checkpoint
@@ -2026,7 +2004,7 @@ public class CheckpointCoordinatorTest {
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
- SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null, 0);
+ SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2040,7 +2018,7 @@ public class CheckpointCoordinatorTest {
for (int index = 0; index < jobVertex2.getParallelism(); index++) {
ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID2, index);
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
- SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null, 0);
+ SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2109,7 +2087,7 @@ public class CheckpointCoordinatorTest {
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
- ExecutionVertex[] arrayExecutionVertices =
+ ExecutionVertex[] arrayExecutionVertices =
allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
@@ -2126,7 +2104,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// trigger the checkpoint
@@ -2146,7 +2123,7 @@ public class CheckpointCoordinatorTest {
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
jobVertexID1, keyGroupPartitions1.get(index), false);
- SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null, 0);
+ SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2163,7 +2140,7 @@ public class CheckpointCoordinatorTest {
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
jobVertexID2, keyGroupPartitions2.get(index), false);
- SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null, 0);
+ SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2245,7 +2222,7 @@ public class CheckpointCoordinatorTest {
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
- ExecutionVertex[] arrayExecutionVertices =
+ ExecutionVertex[] arrayExecutionVertices =
allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
@@ -2262,7 +2239,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// trigger the checkpoint
@@ -2285,7 +2261,7 @@ public class CheckpointCoordinatorTest {
KeyGroupsStateHandle keyedStateRaw = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), true);
- SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw , 0);
+ SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2307,7 +2283,7 @@ public class CheckpointCoordinatorTest {
expectedOpStatesRaw.add(opStateRaw);
SubtaskState checkpointStateHandles =
new SubtaskState(new ChainedStateHandle<>(
- Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw, 0);
+ Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
jid,
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
@@ -2394,7 +2370,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
"fake-directory",
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -2771,7 +2746,6 @@ public class CheckpointCoordinatorTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// Periodic
@@ -2915,4 +2889,76 @@ public class CheckpointCoordinatorTest {
Assert.assertEquals(expectedTotalPartitions, actualTotalPartitions);
Assert.assertEquals(expected, actual);
}
+
+ /**
+ * Tests that the pending checkpoint stats callbacks are created.
+ */
+ @Test
+ public void testCheckpointStatsTrackerPendingCheckpointCallback() {
+ final long timestamp = System.currentTimeMillis();
+ ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+ ExternalizedCheckpointSettings.none(),
+ new ExecutionVertex[]{vertex1},
+ new ExecutionVertex[]{vertex1},
+ new ExecutionVertex[]{vertex1},
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ null,
+ Executors.directExecutor());
+
+ CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+ coord.setCheckpointStatsTracker(tracker);
+
+ when(tracker.reportPendingCheckpoint(anyLong(), anyLong(), any(CheckpointProperties.class)))
+ .thenReturn(mock(PendingCheckpointStats.class));
+
+ // Trigger a checkpoint and verify callback
+ assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+ verify(tracker, times(1))
+ .reportPendingCheckpoint(eq(1L), eq(timestamp), eq(CheckpointProperties.forStandardCheckpoint()));
+ }
+
+ /**
+ * Tests that the restore callbacks are called if registered.
+ */
+ @Test
+ public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
+ ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID());
+
+ StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
+ store.addCheckpoint(new CompletedCheckpoint(new JobID(), 0, 0, 0, Collections.<JobVertexID, TaskState>emptyMap()));
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+ ExternalizedCheckpointSettings.none(),
+ new ExecutionVertex[]{vertex1},
+ new ExecutionVertex[]{vertex1},
+ new ExecutionVertex[]{vertex1},
+ new StandaloneCheckpointIDCounter(),
+ store,
+ null,
+ Executors.directExecutor());
+
+ CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
+ coord.setCheckpointStatsTracker(tracker);
+
+ assertTrue(coord.restoreLatestCheckpointedState(Collections.<JobVertexID, ExecutionJobVertex>emptyMap(), false, true));
+
+ verify(tracker, times(1))
+ .reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index 11bddb9..fb3bd65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -85,4 +85,31 @@ public class CheckpointPropertiesTest {
assertFalse(props.discardOnJobFailed());
assertFalse(props.discardOnJobSuspended());
}
+
+ /**
+ * Tests the isSavepoint utility works as expected.
+ */
+ @Test
+ public void testIsSavepoint() throws Exception {
+ {
+ CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
+ assertFalse(CheckpointProperties.isSavepoint(props));
+ }
+
+ {
+ CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
+ assertFalse(CheckpointProperties.isSavepoint(props));
+ }
+
+ {
+ CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(false);
+ assertFalse(CheckpointProperties.isSavepoint(props));
+ }
+
+ {
+ CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+ assertTrue(CheckpointProperties.isSavepoint(props));
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d1f4bcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7cea130..0e20ebc8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -109,7 +108,6 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
// create ourselves a checkpoint with state
@@ -119,7 +117,7 @@ public class CheckpointStateRestoreTest {
PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
final long checkpointId = pending.getCheckpointId();
- SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null, 0L);
+ SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null);
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointMetaData, checkpointStateHandles));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointMetaData, checkpointStateHandles));
@@ -185,7 +183,6 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
try {
@@ -241,7 +238,6 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- new DisabledCheckpointStatsTracker(),
Executors.directExecutor());
ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest