You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/29 11:55:45 UTC
[5/5] flink git commit: [FLINK-1953] [runtime] Implement robust and
flexible checkpoint coordinator with tests.
[FLINK-1953] [runtime] Implement robust and flexible checkpoint coordinator with tests.
- Checkpoints can be configured to have different sets of tasks
that triggering the checkpoint barriers, that acknowledging the checkpoint,
and that require checkpoint confirmations.
- A configurable number of successful chckpoints can be retained
- Checkpoints time out after a certain time, if not acknowledged (prevent resource leaks)
- Checkpoints are robust to lost messages and out of order acknowledging.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f0ce142
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f0ce142
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f0ce142
Branch: refs/heads/master
Commit: 7f0ce1428bc32181d6d79ca6f1226b9e2e3d93be
Parents: b1af2df
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 27 22:14:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 29 10:47:21 2015 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 380 ++++++++++++
.../runtime/checkpoint/PendingCheckpoint.java | 163 +++++
.../flink/runtime/checkpoint/StateForTask.java | 90 +++
.../checkpoint/SuccessfulCheckpoint.java | 75 +++
.../runtime/executiongraph/ExecutionVertex.java | 25 +
.../messages/checkpoint/AbortCheckpoint.java | 49 ++
.../checkpoint/AbstractCheckpointMessage.java | 91 +++
.../checkpoint/AcknowledgeCheckpoint.java | 73 +++
.../messages/checkpoint/ConfirmCheckpoint.java | 49 ++
.../messages/checkpoint/TriggerCheckpoint.java | 73 +++
.../messages/checkpoint/package-info.java | 24 +
.../flink/runtime/messages/package-info.java | 24 +
.../checkpoint/CheckpointCoordinatorTest.java | 620 +++++++++++++++++++
.../messages/CheckpointMessagesTest.java | 103 +++
14 files changed, 1839 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
new file mode 100644
index 0000000..9647ca4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+public class CheckpointCoordinator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+
+ /** The number of recent checkpoints whose IDs are remembered */
+ private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
+
+
+ /** Coordinator-wide lock to safeguard the checkpoint updates */
+ private final Object lock = new Object();
+
+ /** The job whose checkpoint this coordinator coordinates */
+ private final JobID job;
+
+ /** Tasks who need to be sent a message when a checkpoint is started */
+ private final ExecutionVertex[] tasksToTrigger;
+
+ /** Tasks who need to acknowledge a checkpoint before it succeeds */
+ private final ExecutionVertex[] tasksToWaitFor;
+
+ /** Tasks who need to be sent a message when a checkpoint is confirmed */
+ private final ExecutionVertex[] tasksToCommitTo;
+
+ private final Map<Long, PendingCheckpoint> pendingCheckpoints;
+
+ private final ArrayDeque<SuccessfulCheckpoint> completedCheckpoints;
+
+ private final ArrayDeque<Long> recentPendingCheckpoints;
+
+ private final AtomicLong checkpointIdCounter = new AtomicLong(1);
+
+ private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
+
+ /** The timer that processes the checkpoint timeouts */
+ private final Timer timeoutTimer;
+
+ private final long checkpointTimeout;
+
+ private final int numSuccessfulCheckpointsToRetain;
+
+ private boolean shutdown;
+
+ // --------------------------------------------------------------------------------------------
+
+ public CheckpointCoordinator(JobID job, int numSuccessfulCheckpointsToRetain, long checkpointTimeout,
+ ExecutionVertex[] tasksToTrigger,
+ ExecutionVertex[] tasksToWaitFor,
+ ExecutionVertex[] tasksToCommitTo) {
+
+ // some sanity checks
+ if (job == null || tasksToTrigger == null ||
+ tasksToWaitFor == null || tasksToCommitTo == null) {
+ throw new NullPointerException();
+ }
+ if (numSuccessfulCheckpointsToRetain < 1) {
+ throw new IllegalArgumentException("Must retain at least one successful checkpoint");
+ }
+ if (checkpointTimeout < 1) {
+ throw new IllegalArgumentException("Checkpoint timeout must be larger than zero");
+ }
+
+ this.job = job;
+ this.numSuccessfulCheckpointsToRetain = numSuccessfulCheckpointsToRetain;
+ this.checkpointTimeout = checkpointTimeout;
+ this.tasksToTrigger = tasksToTrigger;
+ this.tasksToWaitFor = tasksToWaitFor;
+ this.tasksToCommitTo = tasksToCommitTo;
+ this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
+ this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
+ this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
+
+ timeoutTimer = new Timer("Checkpoint Timeout Handler", true);
+ }
+
+ /**
+ * Shuts down the checkpoint coordinator.
+ *
+ * After this method has been called, the coordinator does not accept and further
+ * messages and cannot trigger any further checkpoints.
+ */
+ public void shutdown() {
+ synchronized (lock) {
+ if (shutdown) {
+ return;
+ }
+ shutdown = true;
+
+ // shut down the thread that handles the timeouts
+ timeoutTimer.cancel();
+
+ // clear and discard all pending checkpoints
+ for (PendingCheckpoint pending : pendingCheckpoints.values()) {
+ pending.discard();
+ }
+ pendingCheckpoints.clear();
+
+ // clean and discard all successful checkpoints
+ for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
+ checkpoint.dispose();
+ }
+ completedCheckpoints.clear();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Handling checkpoints and messages
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Triggers a new checkpoint and uses the current system time as the
+ * checkpoint time.
+ */
+ public void triggerCheckpoint() {
+ triggerCheckpoint(System.currentTimeMillis());
+ }
+
+ /**
+ * Triggers a new checkpoint and uses the given timestamp as the checkpoint
+ * timestamp.
+ *
+ * @param timestamp The timestamp for the checkpoint.
+ */
+ public boolean triggerCheckpoint(final long timestamp) {
+ if (shutdown) {
+ LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown.");
+ return false;
+ }
+
+ final long checkpointID = checkpointIdCounter.getAndIncrement();
+ LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+
+ try {
+ // first check if all tasks that we need to trigger are running.
+ // if not, abort the checkpoint
+ ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
+ for (int i = 0; i < tasksToTrigger.length; i++) {
+ Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
+ if (ee != null) {
+ triggerIDs[i] = ee.getAttemptId();
+ } else {
+ LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
+ tasksToTrigger[i].getSimpleName());
+ return false;
+ }
+ }
+
+ // next, check if all tasks that need to acknowledge the checkpoint are running.
+ // if not, abort the checkpoint
+ Map<ExecutionAttemptID, ExecutionVertex> ackTasks =
+ new HashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length);
+
+ for (ExecutionVertex ev : tasksToWaitFor) {
+ Execution ee = ev.getCurrentExecutionAttempt();
+ if (ee != null) {
+ ackTasks.put(ee.getAttemptId(), ev);
+ } else {
+ LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
+ ev.getSimpleName());
+ return false;
+ }
+ }
+
+ // register a new pending checkpoint. this makes sure we can properly receive acknowledgements
+ final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+
+ // schedule the timer that will clean up the expired checkpoints
+ TimerTask canceller = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ synchronized (lock) {
+ // only do the work if the checkpoint is not discarded anyways
+ // note that checkpoint completion discards the pending checkpoint object
+ if (!checkpoint.isDiscarded()) {
+ LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+ checkpoint.discard();
+ pendingCheckpoints.remove(checkpointID);
+ rememberRecentCheckpointId(checkpointID);
+ }
+ }
+ }
+ catch (Throwable t) {
+ LOG.error("Exception while handling checkpoint timeout", t);
+ }
+ }
+ };
+
+ synchronized (lock) {
+ if (shutdown) {
+ throw new IllegalStateException("Checkpoint coordinator has been shutdown.");
+ }
+ pendingCheckpoints.put(checkpointID, checkpoint);
+ timeoutTimer.schedule(canceller, checkpointTimeout);
+ }
+
+ // send the messages to the tasks that trigger their checkpoint
+ for (int i = 0; i < tasksToTrigger.length; i++) {
+ ExecutionAttemptID id = triggerIDs[i];
+ TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
+ tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
+ }
+
+ numUnsuccessfulCheckpointsTriggers.set(0);
+ return true;
+ }
+ catch (Throwable t) {
+ int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
+ LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+
+ synchronized (lock) {
+ pendingCheckpoints.remove(checkpointID);
+ }
+
+ return false;
+ }
+ }
+
+ public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) {
+ if (shutdown || message == null) {
+ return;
+ }
+ if (!job.equals(message.getJob())) {
+ LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message);
+ return;
+ }
+
+ final long checkpointId = message.getCheckpointId();
+ boolean checkpointCompleted = false;
+
+ synchronized (lock) {
+ // we need to check inside the lock for being shutdown as well, otherwise we
+ // get races and invalid error log messages
+ if (shutdown) {
+ return;
+ }
+
+ PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
+ if (checkpoint != null && !checkpoint.isDiscarded()) {
+ if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) {
+
+ if (checkpoint.isFullyAcknowledged()) {
+ LOG.info("Completed checkpoint " + checkpointId);
+
+ SuccessfulCheckpoint completed = checkpoint.toCompletedCheckpoint();
+ completedCheckpoints.addLast(completed);
+ if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
+ completedCheckpoints.removeFirst();
+ }
+ pendingCheckpoints.remove(checkpointId);
+ rememberRecentCheckpointId(checkpointId);
+
+ dropSubsumedCheckpoints(completed.getTimestamp());
+
+ checkpointCompleted = true;
+ }
+ }
+ else {
+ // checkpoint did not accept message
+ LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId
+ + " , task " + message.getTaskExecutionId());
+ }
+ }
+ else if (checkpoint != null) {
+ // this should not happen
+ throw new IllegalStateException(
+ "Received message for discarded but non-removed checkpoint " + checkpointId);
+ }
+ else {
+ // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
+ if (recentPendingCheckpoints.contains(checkpointId)) {
+ LOG.warn("Received late message for now expired checkpoint attempt " + checkpointId);
+ }
+ else {
+ LOG.info("Received message for non-existing checkpoint " + checkpointId);
+ }
+ }
+ }
+
+ // send the confirmation messages to the necessary targets. we do this here
+ // to be outside the lock scope
+ if (checkpointCompleted) {
+ for (ExecutionVertex ev : tasksToCommitTo) {
+ Execution ee = ev.getCurrentExecutionAttempt();
+ if (ee != null) {
+ ExecutionAttemptID attemptId = ee.getAttemptId();
+ ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId);
+ ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId());
+ }
+ }
+ }
+ }
+
+ private void rememberRecentCheckpointId(long id) {
+ if (recentPendingCheckpoints.size() >= NUM_GHOST_CHECKPOINT_IDS) {
+ recentPendingCheckpoints.removeFirst();
+ }
+ recentPendingCheckpoints.addLast(id);
+ }
+
+ private void dropSubsumedCheckpoints(long timestamp) {
+ Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
+ while (entries.hasNext()) {
+ PendingCheckpoint p = entries.next().getValue();
+ if (p.getCheckpointTimestamp() < timestamp) {
+ rememberRecentCheckpointId(p.getCheckpointId());
+ p.discard();
+ entries.remove();
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Accessors
+ // --------------------------------------------------------------------------------------------
+
+ public int getNumberOfPendingCheckpoints() {
+ return this.pendingCheckpoints.size();
+ }
+
+ public int getNumberOfRetainedSuccessfulCheckpoints() {
+ return this.completedCheckpoints.size();
+ }
+
+ public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
+ synchronized (lock) {
+ return new HashMap<Long, PendingCheckpoint>(this.pendingCheckpoints);
+ }
+ }
+
+ public List<SuccessfulCheckpoint> getSuccessfulCheckpoints() {
+ synchronized (lock) {
+ return new ArrayList<SuccessfulCheckpoint>(this.completedCheckpoints);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
new file mode 100644
index 0000000..e221238
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.state.StateHandle;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A pending checkpoint is a checkpoint that has been started, but has not been
+ * acknowledged by all tasks that need to acknowledge it. Once all tasks have
+ * acknowledged it, it becomes a {@link SuccessfulCheckpoint}.
+ */
+public class PendingCheckpoint {
+
+ private final Object lock = new Object();
+
+ private final JobID jobId;
+
+ private final long checkpointId;
+
+ private final long checkpointTimestamp;
+
+ private final List<StateForTask> collectedStates;
+
+ private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
+
+ private int numAcknowledgedTasks;
+
+ private boolean discarded;
+
+ // --------------------------------------------------------------------------------------------
+
+ public PendingCheckpoint(JobID jobId, long checkpointId, long checkpointTimestamp,
+ Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm)
+ {
+ if (jobId == null || verticesToConfirm == null) {
+ throw new NullPointerException();
+ }
+ if (verticesToConfirm.size() == 0) {
+ throw new IllegalArgumentException("Checkpoint needs at least one vertex that commits the checkpoint");
+ }
+
+ this.jobId = jobId;
+ this.checkpointId = checkpointId;
+ this.checkpointTimestamp = checkpointTimestamp;
+
+ this.notYetAcknowledgedTasks = verticesToConfirm;
+ this.collectedStates = new ArrayList<StateForTask>(notYetAcknowledgedTasks.size());
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ public long getCheckpointTimestamp() {
+ return checkpointTimestamp;
+ }
+
+ public int getNumberOfNonAcknowledgedTasks() {
+ return notYetAcknowledgedTasks.size();
+ }
+
+ public int getNumberOfAcknowledgedTasks() {
+ return numAcknowledgedTasks;
+ }
+
+ public boolean isFullyAcknowledged() {
+ return this.notYetAcknowledgedTasks.isEmpty() && !discarded;
+ }
+
+ public boolean isDiscarded() {
+ return discarded;
+ }
+
+ public List<StateForTask> getCollectedStates() {
+ return collectedStates;
+ }
+
+ public SuccessfulCheckpoint toCompletedCheckpoint() {
+ synchronized (lock) {
+ if (discarded) {
+ throw new IllegalStateException("pending checkpoint is discarded");
+ }
+ if (notYetAcknowledgedTasks.isEmpty()) {
+ SuccessfulCheckpoint completed = new SuccessfulCheckpoint(jobId, checkpointId,
+ checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
+ discard();
+ return completed;
+ }
+ else {
+ throw new IllegalStateException("Cannot complete checkpoint while nit all tasks are acknowledged");
+ }
+ }
+ }
+
+ public boolean acknowledgeTask(ExecutionAttemptID attemptID, StateHandle state) {
+ synchronized (lock) {
+ if (discarded) {
+ return false;
+ }
+
+ ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
+ if (vertex != null) {
+ if (state != null) {
+ collectedStates.add(new StateForTask(state, vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()));
+ }
+ numAcknowledgedTasks++;
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Discards the pending checkpoint, releasing all held resources.
+ */
+ public void discard() {
+ synchronized (lock) {
+ discarded = true;
+ numAcknowledgedTasks = -1;
+ collectedStates.clear();
+ notYetAcknowledgedTasks.clear();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
new file mode 100644
index 0000000..83a6dc8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateHandle;
+
+/**
+ * Simple bean to describe the state belonging to a parallel operator.
+ * Since we hold the state across execution attempts, we identify a task by its
+ * JobVertexId and subtask index.
+ */
+public class StateForTask {
+
+ /** The state of the parallel operator */
+ private final StateHandle state;
+
+ /** The vertex id of the parallel operator */
+ private final JobVertexID operatorId;
+
+ /** The index of the parallel subtask */
+ private final int subtask;
+
+ public StateForTask(StateHandle state, JobVertexID operatorId, int subtask) {
+ if (state == null || operatorId == null || subtask < 0) {
+ throw new IllegalArgumentException();
+ }
+
+ this.state = state;
+ this.operatorId = operatorId;
+ this.subtask = subtask;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public StateHandle getState() {
+ return state;
+ }
+
+ public JobVertexID getOperatorId() {
+ return operatorId;
+ }
+
+ public int getSubtask() {
+ return subtask;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o instanceof StateForTask) {
+ StateForTask that = (StateForTask) o;
+ return this.subtask == that.subtask && this.operatorId.equals(that.operatorId)
+ && this.state.equals(that.state);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return state.hashCode() + 31 * operatorId.hashCode() + 43 * subtask;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("StateForTask %s-%d : %s", operatorId, subtask, state);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
new file mode 100644
index 0000000..cd7efba
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.List;
+
+/**
+ * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
+ * and that is considered completed.
+ */
+public class SuccessfulCheckpoint {
+
+ private final JobID job;
+
+ private final long checkpointID;
+
+ private final long timestamp;
+
+ private final List<StateForTask> states;
+
+
+ public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) {
+ this.job = job;
+ this.checkpointID = checkpointID;
+ this.timestamp = timestamp;
+ this.states = states;
+ }
+
+ public JobID getJobId() {
+ return job;
+ }
+
+ public long getCheckpointID() {
+ return checkpointID;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public List<StateForTask> getStates() {
+ return states;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public void dispose() {
+ states.clear();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e7e019f..a44fc6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import akka.actor.ActorRef;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -448,6 +449,30 @@ public class ExecutionVertex implements Serializable {
this.currentExecution.fail(t);
}
+ public void sendMessageToCurrentExecution(Serializable message, ExecutionAttemptID attemptID) {
+ Execution exec = getCurrentExecutionAttempt();
+
+ // check that this is for the correct execution attempt
+ if (exec != null && exec.getAttemptId().equals(attemptID)) {
+ SimpleSlot slot = exec.getAssignedResource();
+
+ // send only if we actually have a target
+ if (slot != null) {
+ ActorRef taskManager = slot.getInstance().getTaskManager();
+ if (taskManager != null) {
+ taskManager.tell(message, ActorRef.noSender());
+ }
+ }
+ else {
+ LOG.debug("Skipping message to undeployed task execution {}/{}", getSimpleName(), attemptID);
+ }
+ }
+ else {
+ LOG.debug("Skipping message to {}/{} because it does not match the current execution",
+ getSimpleName(), attemptID);
+ }
+ }
+
/**
* Schedules or updates the consumer tasks of the result partition with the given ID.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
new file mode 100644
index 0000000..0493ba6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
+ * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint
+ * has been confirmed and that the task can commit the checkpoint to the outside world.
+ */
+public class AbortCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
+
+ private static final long serialVersionUID = 2094094662279578953L;
+
+ public AbortCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+ super(job, taskExecutionId, checkpointId);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o || ( (o instanceof AbortCheckpoint) && super.equals(o));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("AbortCheckpoint %d for (%s/%s)",
+ getCheckpointId(), getJob(), getTaskExecutionId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbstractCheckpointMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbstractCheckpointMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbstractCheckpointMessage.java
new file mode 100644
index 0000000..dc50dc9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbstractCheckpointMessage.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * The base class of all checkpoint messages.
+ */
+public abstract class AbstractCheckpointMessage implements java.io.Serializable {
+
+ private static final long serialVersionUID = 186780414819428178L;
+
+ /** The job to which this message belongs */
+ private final JobID job;
+
+ /** The task execution that is source/target of the checkpoint message */
+ private final ExecutionAttemptID taskExecutionId;
+
+ /** The ID of the checkpoint that this message coordinates */
+ private final long checkpointId;
+
+
+ protected AbstractCheckpointMessage(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+ if (job == null || taskExecutionId == null) {
+ throw new NullPointerException();
+ }
+
+ this.job = job;
+ this.taskExecutionId = taskExecutionId;
+ this.checkpointId = checkpointId;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public JobID getJob() {
+ return job;
+ }
+
+ public ExecutionAttemptID getTaskExecutionId() {
+ return taskExecutionId;
+ }
+
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return job.hashCode() + taskExecutionId.hashCode() + (int) (checkpointId ^ (checkpointId >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o instanceof AbstractCheckpointMessage) {
+ AbstractCheckpointMessage that = (AbstractCheckpointMessage) o;
+ return this.job.equals(that.job) && this.taskExecutionId.equals(that.taskExecutionId) &&
+ this.checkpointId == that.checkpointId;
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "(" + checkpointId + ':' + job + '/' + taskExecutionId + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
new file mode 100644
index 0000000..dd94e37
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.StateHandle;
+
+/**
+ * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
+ * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
+ * individual task is completed.
+ *
+ * This message may carry the handle to the task's state.
+ */
+public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
+
+ private static final long serialVersionUID = -7606214777192401493L;
+
+ private final StateHandle state;
+
+ public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+ this(job, taskExecutionId, checkpointId, null);
+ }
+
+ public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, StateHandle state) {
+ super(job, taskExecutionId, checkpointId);
+ this.state = state;
+ }
+
+ public StateHandle getState() {
+ return state;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true ;
+ }
+ else if (o instanceof AcknowledgeCheckpoint) {
+ AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
+ return super.equals(o) && (this.state == null ? that.state == null :
+ (that.state != null && this.state.equals(that.state)));
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s",
+ getCheckpointId(), getJob(), getTaskExecutionId(), state);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
new file mode 100644
index 0000000..cdfd202
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
+ * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint
+ * has been confirmed and that the task can commit the checkpoint to the outside world.
+ */
+public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
+
+ private static final long serialVersionUID = 2094094662279578953L;
+
+ public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+ super(job, taskExecutionId, checkpointId);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o || ( (o instanceof ConfirmCheckpoint) && super.equals(o));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ConfirmCheckpoint %d for (%s/%s)",
+ getCheckpointId(), getJob(), getTaskExecutionId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
new file mode 100644
index 0000000..0528755
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
+ * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a certain task to trigger its
+ * checkpoint.
+ */
+public class TriggerCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
+
+ private static final long serialVersionUID = 2094094662279578953L;
+
+ /** The timestamp associated with the checkpoint */
+ private final long timestamp;
+
+ public TriggerCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
+ super(job, taskExecutionId, checkpointId);
+ this.timestamp = timestamp;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o instanceof TriggerCheckpoint) {
+ TriggerCheckpoint that = (TriggerCheckpoint) o;
+ return this.timestamp == that.timestamp && super.equals(o);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Trigger Checkpoint %d@%d for (%s/%s)",
+ getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
new file mode 100644
index 0000000..7b96b81
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the checkpoint snapshots of the
+ * distributed dataflow.
+ */
+package org.apache.flink.runtime.messages.checkpoint;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
new file mode 100644
index 0000000..e0b8cce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the messages that are sent between actors, like the
+ * {@link org.apache.flink.runtime.jobmanager.JobManager} and
+ * {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the distributed operations.
+ */
+package org.apache.flink.runtime.messages;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
new file mode 100644
index 0000000..aee0e63
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Tests for the checkpoint coordinator.
+ */
+public class CheckpointCoordinatorTest {
+
+ @Test
+ public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
+ try {
+ final JobID jid = new JobID();
+ final long timestamp = System.currentTimeMillis();
+
+ // create some mock Execution vertices that receive the checkpoint trigger messages
+ ExecutionVertex triggerVertex1 = mock(ExecutionVertex.class);
+ ExecutionVertex triggerVertex2 = mock(ExecutionVertex.class);
+
+ // create some mock Execution vertices that need to ack the checkpoint
+ final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+ ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+ ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid, 1, 600000,
+ new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+ new ExecutionVertex[] { ackVertex1, ackVertex2 },
+ new ExecutionVertex[] {} );
+
+ // nothing should be happening
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ // trigger the first checkpoint. this should not succeed
+ assertFalse(coord.triggerCheckpoint(timestamp));
+
+ // still, nothing should be happening
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
+ try {
+ final JobID jid = new JobID();
+ final long timestamp = System.currentTimeMillis();
+
+ // create some mock Execution vertices that need to ack the checkpoint
+ final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
+ ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
+ ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
+
+ // create some mock Execution vertices that receive the checkpoint trigger messages
+ ExecutionVertex ackVertex1 = mock(ExecutionVertex.class);
+ ExecutionVertex ackVertex2 = mock(ExecutionVertex.class);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid, 1, 600000,
+ new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+ new ExecutionVertex[] { ackVertex1, ackVertex2 },
+ new ExecutionVertex[] {} );
+
+ // nothing should be happening
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ // trigger the first checkpoint. this should not succeed
+ assertFalse(coord.triggerCheckpoint(timestamp));
+
+ // still, nothing should be happening
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTriggerAndConfirmSimpleCheckpoint() {
+ try {
+ final JobID jid = new JobID();
+ final long timestamp = System.currentTimeMillis();
+
+ // create some mock Execution vertices that receive the checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+ ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid, 1, 600000,
+ new ExecutionVertex[] { vertex1, vertex2 },
+ new ExecutionVertex[] { vertex1, vertex2 },
+ new ExecutionVertex[] { vertex1, vertex2 });
+
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ // trigger the first checkpoint. this should succeed
+ assertTrue(coord.triggerCheckpoint(timestamp));
+
+ // validate that we have a pending checkpoint
+ assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+
+ assertNotNull(checkpoint);
+ assertEquals(checkpointId, checkpoint.getCheckpointId());
+ assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
+ assertEquals(jid, checkpoint.getJobId());
+ assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
+ assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
+ assertEquals(0, checkpoint.getCollectedStates().size());
+ assertFalse(checkpoint.isDiscarded());
+ assertFalse(checkpoint.isFullyAcknowledged());
+
+ // check that the vertices received the trigger checkpoint message
+ {
+ TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointId, timestamp);
+ TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointId, timestamp);
+ verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+ verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+ }
+
+ // acknowledge from one of the tasks
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+ assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
+ assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
+ assertFalse(checkpoint.isDiscarded());
+ assertFalse(checkpoint.isFullyAcknowledged());
+
+ // acknowledge the same task again (should not matter)
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+ assertFalse(checkpoint.isDiscarded());
+ assertFalse(checkpoint.isFullyAcknowledged());
+
+ // acknowledge the other task.
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+
+ // the checkpoint is internally converted to a successful checkpoint and the
+ // pending checkpoint object is disposed
+ assertTrue(checkpoint.isDiscarded());
+
+ // the now we should have a completed checkpoint
+ assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+
+ // validate that the relevant tasks got a confirmation message
+ {
+ ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId);
+ ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId);
+ verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
+ verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
+ }
+
+ SuccessfulCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
+ assertEquals(jid, success.getJobId());
+ assertEquals(timestamp, success.getTimestamp());
+ assertEquals(checkpoint.getCheckpointId(), success.getCheckpointID());
+ assertTrue(success.getStates().isEmpty());
+
+ // ---------------
+ // trigger another checkpoint and see that this one replaces the other checkpoint
+ // ---------------
+ final long timestampNew = timestamp + 7;
+ coord.triggerCheckpoint(timestampNew);
+
+ long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ SuccessfulCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
+ assertEquals(jid, successNew.getJobId());
+ assertEquals(timestampNew, successNew.getTimestamp());
+ assertEquals(checkpointIdNew, successNew.getCheckpointID());
+ assertTrue(successNew.getStates().isEmpty());
+
+ // validate that the relevant tasks got a confirmation message
+ {
+ TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew);
+ TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew);
+ verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+ verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+
+ ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew);
+ ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew);
+ verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
+ verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
+ }
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ @Test
+ public void testMultipleConcurrentCheckpoints() {
+ try {
+ final JobID jid = new JobID();
+ final long timestamp1 = System.currentTimeMillis();
+ final long timestamp2 = timestamp1 + 8617;
+
+ // create some mock execution vertices
+
+ final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
+
+ final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
+
+ final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+ ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
+ ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
+
+ ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+ ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+ ExecutionVertex ackVertex3 = mockExecutionVertex(ackAttemptID3);
+
+ ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid, 2, 600000,
+ new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+ new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
+ new ExecutionVertex[] { commitVertex });
+
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ // trigger the first checkpoint. this should succeed
+ assertTrue(coord.triggerCheckpoint(timestamp1));
+
+ assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+ long checkpointId1 = pending1.getCheckpointId();
+
+ // trigger messages should have been sent
+ verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
+ new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId1, timestamp1), triggerAttemptID1);
+ verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
+ new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
+
+ // acknowledge one of the three tasks
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+
+ // start the second checkpoint
+ // trigger the first checkpoint. this should succeed
+ assertTrue(coord.triggerCheckpoint(timestamp2));
+
+ assertEquals(2, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ PendingCheckpoint pending2;
+ {
+ Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+ PendingCheckpoint cc1 = all.next();
+ PendingCheckpoint cc2 = all.next();
+ pending2 = pending1 == cc1 ? cc2 : cc1;
+ }
+ long checkpointId2 = pending2.getCheckpointId();
+
+ // trigger messages should have been sent
+ verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
+ new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
+ verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
+ new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId2, timestamp2), triggerAttemptID2);
+
+ // we acknowledge the remaining two tasks from the first
+ // checkpoint and two tasks from the second checkpoint
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+
+ // now, the first checkpoint should be confirmed
+ assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertTrue(pending1.isDiscarded());
+
+ // the first confirm message should be out
+ verify(commitVertex, times(1)).sendMessageToCurrentExecution(
+ new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1), commitAttemptID);
+
+ // send the last remaining ack for the second checkpoint
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
+
+ // now, the second checkpoint should be confirmed
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertTrue(pending2.isDiscarded());
+
+ // the second commit message should be out
+ verify(commitVertex, times(1)).sendMessageToCurrentExecution(
+ new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2), commitAttemptID);
+
+ // validate the committed checkpoints
+ List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
+
+ SuccessfulCheckpoint sc1 = scs.get(0);
+ assertEquals(checkpointId1, sc1.getCheckpointID());
+ assertEquals(timestamp1, sc1.getTimestamp());
+ assertEquals(jid, sc1.getJobId());
+ assertTrue(sc1.getStates().isEmpty());
+
+ SuccessfulCheckpoint sc2 = scs.get(1);
+ assertEquals(checkpointId2, sc2.getCheckpointID());
+ assertEquals(timestamp2, sc2.getTimestamp());
+ assertEquals(jid, sc2.getJobId());
+ assertTrue(sc2.getStates().isEmpty());
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCheckpointSubsumesUnsuccessful() {
+ try {
+ final JobID jid = new JobID();
+ final long timestamp1 = System.currentTimeMillis();
+ final long timestamp2 = timestamp1 + 1552;
+
+ // create some mock execution vertices
+ final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
+
+ final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
+
+ final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+ ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
+ ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
+
+ ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+ ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+ ExecutionVertex ackVertex3 = mockExecutionVertex(ackAttemptID3);
+
+ ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid, 10, 600000,
+ new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+ new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
+ new ExecutionVertex[] { commitVertex });
+
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ // trigger the first checkpoint. this should succeed
+ assertTrue(coord.triggerCheckpoint(timestamp1));
+
+ assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+ long checkpointId1 = pending1.getCheckpointId();
+
+ // trigger messages should have been sent
+ verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
+ new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId1, timestamp1), triggerAttemptID1);
+ verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
+ new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
+
+ // acknowledge one of the three tasks
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+
+ // start the second checkpoint
+ // trigger the first checkpoint. this should succeed
+ assertTrue(coord.triggerCheckpoint(timestamp2));
+
+ assertEquals(2, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ PendingCheckpoint pending2;
+ {
+ Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+ PendingCheckpoint cc1 = all.next();
+ PendingCheckpoint cc2 = all.next();
+ pending2 = pending1 == cc1 ? cc2 : cc1;
+ }
+ long checkpointId2 = pending2.getCheckpointId();
+
+ // trigger messages should have been sent
+ verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
+ new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
+ verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
+ new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId2, timestamp2), triggerAttemptID2);
+
+ // we acknowledge one more task from the first checkpoint and the second
+ // checkpoint completely. The second checkpoint should then subsume the first checkpoint
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+
+ // now, the second checkpoint should be confirmed, and the first discarded
+ // actually both pending checkpoints are discarded, and the second has been transformed
+ // into a successful checkpoint
+ assertTrue(pending1.isDiscarded());
+ assertTrue(pending2.isDiscarded());
+
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ // validate the committed checkpoints
+ List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
+ SuccessfulCheckpoint success = scs.get(0);
+ assertEquals(checkpointId2, success.getCheckpointID());
+ assertEquals(timestamp2, success.getTimestamp());
+ assertEquals(jid, success.getJobId());
+ assertTrue(success.getStates().isEmpty());
+
+ // the first confirm message should be out
+ verify(commitVertex, times(1)).sendMessageToCurrentExecution(
+ new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2), commitAttemptID);
+
+ // send the last remaining ack for the first checkpoint. This should not do anything
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ @Test
+ public void testCheckpointTimeoutIsolated() {
+ try {
+ final JobID jid = new JobID();
+ final long timestamp = System.currentTimeMillis();
+
+ // create some mock execution vertices
+
+ final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+
+ final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+
+ final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+ ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+
+ ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+ ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+
+ ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+ // set up the coordinator
+ // the timeout for the checkpoint is a 200 milliseconds
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid, 2, 200,
+ new ExecutionVertex[] { triggerVertex },
+ new ExecutionVertex[] { ackVertex1, ackVertex2 },
+ new ExecutionVertex[] { commitVertex });
+
+ // trigger a checkpoint, partially acknowledged
+ assertTrue(coord.triggerCheckpoint(timestamp));
+ assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+ PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
+ assertFalse(checkpoint.isDiscarded());
+
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
+
+ // wait until the checkpoint must have expired.
+ // we check every 250 msecs conservatively for 5 seconds
+ // to give even slow build servers a very good chance of completing this
+ long deadline = System.currentTimeMillis() + 5000;
+ do {
+ Thread.sleep(250);
+ }
+ while (!checkpoint.isDiscarded() && System.currentTimeMillis() < deadline);
+
+ assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ // no confirm message must have been sent
+ verify(commitVertex, times(0))
+ .sendMessageToCurrentExecution(any(ConfirmCheckpoint.class), any(ExecutionAttemptID.class));
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void handleMessagesForNonExistingCheckpoints() {
+ try {
+ final JobID jid = new JobID();
+ final long timestamp = System.currentTimeMillis();
+
+ // create some mock execution vertices and trigger some checkpoint
+
+ final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+ final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+ ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+ ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+ ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+ ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid, 2, 200000,
+ new ExecutionVertex[] { triggerVertex },
+ new ExecutionVertex[] { ackVertex1, ackVertex2 },
+ new ExecutionVertex[] { commitVertex });
+
+ assertTrue(coord.triggerCheckpoint(timestamp));
+
+ long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
+
+ // send some messages that do not belong to either the job or the any
+ // of the vertices that need to be acknowledged.
+ // non of the messages should throw an exception
+
+ // wrong job id
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
+
+ // unknown checkpoint
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
+
+ // unknown ack vertex
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
+ final Execution exec = mock(Execution.class);
+ when(exec.getAttemptId()).thenReturn(attemptID);
+
+ ExecutionVertex vertex = mock(ExecutionVertex.class);
+ when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+ return vertex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
new file mode 100644
index 0000000..fe1a598
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.checkpoint.AbortCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public class CheckpointMessagesTest {
+
+ @Test
+ public void testTriggerAndConfirmCheckpoint() {
+ try {
+ ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L);
+ testSerializabilityEqualsHashCode(cc);
+
+ TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
+ testSerializabilityEqualsHashCode(tc);
+
+ AbortCheckpoint ac = new AbortCheckpoint(new JobID(), new ExecutionAttemptID(), 1365762983745L);
+ testSerializabilityEqualsHashCode(ac);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConfirmTaskCheckpointed() {
+ try {
+ AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint(
+ new JobID(), new ExecutionAttemptID(), 569345L);
+
+ AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
+ new JobID(), new ExecutionAttemptID(), 87658976143L, new MyHandle());
+
+ testSerializabilityEqualsHashCode(noState);
+ testSerializabilityEqualsHashCode(withState);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void testSerializabilityEqualsHashCode(Serializable o) throws IOException {
+ Object copy = CommonTestUtils.createCopySerializable(o);
+ assertEquals(o, copy);
+ assertEquals(o.hashCode(), copy.hashCode());
+ assertNotNull(o.toString());
+ assertNotNull(copy.toString());
+ }
+
+ private static class MyHandle implements StateHandle {
+
+ private static final long serialVersionUID = 8128146204128728332L;
+
+ @Override
+ public Map<String, OperatorState<?>> getState(ClassLoader userClassloader) {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj.getClass() == this.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ };
+}