You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/01 20:22:40 UTC

[2/3] flink git commit: [FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointStore in CheckpointCoordinator

[FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointStore in CheckpointCoordinator

Handle exceptions from the CompletedCheckpointStore properly in the CheckpointCoordinator. This
means that in case of an exception, the completed checkpoint will be properly cleaned up and also
the triggering of subsequent checkpoints will be started.

Fix failing SavepointCoordinatorTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b734d7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b734d7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b734d7b

Branch: refs/heads/release-1.1
Commit: 4b734d7b8726200e5293c32f2cb9e8c77db4d378
Parents: d314bc5
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 24 18:16:28 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 1 18:00:53 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 143 +++++++++++-------
 .../runtime/checkpoint/CheckpointException.java |  35 +++++
 .../runtime/checkpoint/PendingCheckpoint.java   |  16 +--
 .../CheckpointCoordinatorFailureTest.java       | 144 +++++++++++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   |   4 +-
 5 files changed, 275 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0d09922..74e6d08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -671,24 +671,17 @@ public class CheckpointCoordinator {
 	 *
 	 * @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
 	 */
-	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
+	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
 		if (shutdown || message == null) {
 			return false;
 		}
 		if (!job.equals(message.getJob())) {
-			LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message);
+			LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
 			return false;
 		}
 
 		final long checkpointId = message.getCheckpointId();
 
-		CompletedCheckpoint completed = null;
-		PendingCheckpoint checkpoint;
-
-		// Flag indicating whether the ack message was for a known pending
-		// checkpoint.
-		boolean isPendingCheckpoint;
-
 		synchronized (lock) {
 			// we need to check inside the lock for being shutdown as well, otherwise we
 			// get races and invalid error log messages
@@ -696,45 +689,16 @@ public class CheckpointCoordinator {
 				return false;
 			}
 
-			checkpoint = pendingCheckpoints.get(checkpointId);
+			final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
 
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
-				isPendingCheckpoint = true;
 
 				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize(), null)) {
 					case SUCCESS:
 
 						if (checkpoint.isFullyAcknowledged()) {
-
-							lastCheckpointCompletionNanos = System.nanoTime();
-							completed = checkpoint.finalizeCheckpoint();
-
-							completedCheckpointStore.addCheckpoint(completed);
-
-							LOG.info("Completed checkpoint " + checkpointId + " (in " +
-								completed.getDuration() + " ms)");
-
-							if (LOG.isDebugEnabled()) {
-								StringBuilder builder = new StringBuilder();
-								builder.append("Checkpoint state: ");
-								for (TaskState state : completed.getTaskStates().values()) {
-									builder.append(state);
-									builder.append(", ");
-								}
-								// Remove last two chars ", "
-								builder.delete(builder.length() - 2, builder.length());
-
-								LOG.debug(builder.toString());
-							}
-
-							pendingCheckpoints.remove(checkpointId);
-							rememberRecentCheckpointId(checkpointId);
-
-							dropSubsumedCheckpoints(completed.getTimestamp());
-
-							onFullyAcknowledgedCheckpoint(completed);
-
-							triggerQueuedRequests();
+							completePendingCheckpoint(checkpoint);
+							
 						}
 						break;
 					case DUPLICATE:
@@ -757,6 +721,8 @@ public class CheckpointCoordinator {
 
 						discardState(message.getState());
 				}
+
+				return true;
 			}
 			else if (checkpoint != null) {
 				// this should not happen
@@ -764,39 +730,108 @@ public class CheckpointCoordinator {
 						"Received message for discarded but non-removed checkpoint " + checkpointId);
 			}
 			else {
+				boolean wasPendingCheckpoint;
+
 				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
 				if (recentPendingCheckpoints.contains(checkpointId)) {
-					isPendingCheckpoint = true;
+					wasPendingCheckpoint = true;
 					LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
 				}
 				else {
 					LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
-					isPendingCheckpoint = false;
+					wasPendingCheckpoint = false;
 				}
 
 				// try to discard the state so that we don't have lingering state lying around
 				discardState(message.getState());
+
+				return wasPendingCheckpoint;
+			}
+		}
+	}
+
+	/**
+	 * Try to complete the given pending checkpoint.
+	 *
+	 * Important: This method should only be called in the checkpoint lock scope.
+	 *
+	 * @param pendingCheckpoint to complete
+	 * @throws CheckpointException if the completion failed
+	 */
+	private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
+		// we have to be called in the checkpoint lock scope
+		assert(Thread.holdsLock(lock));
+
+		final long checkpointId = pendingCheckpoint.getCheckpointId();
+		CompletedCheckpoint completedCheckpoint = null;
+
+		try {
+			completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();			
+
+			completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+			rememberRecentCheckpointId(checkpointId);
+			dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
+
+			onFullyAcknowledgedCheckpoint(completedCheckpoint);
+		} catch (Exception exception) {
+			// abort the current pending checkpoint if it has not been discarded yet
+			if (!pendingCheckpoint.isDiscarded()) {
+				pendingCheckpoint.discard(userClassLoader);
+			}
+
+			if (completedCheckpoint != null) {
+				// we failed to store the completed checkpoint. Let's clean up
+				final CompletedCheckpoint cc = completedCheckpoint;
+
+				executor.execute(new Runnable() {
+					@Override
+					public void run() {
+						try {
+							cc.discard(userClassLoader);
+						} catch (Exception nestedException) {
+							LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException);
+						}
+					}
+				});
 			}
+
+			throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
+		} finally {
+			pendingCheckpoints.remove(checkpointId);
+
+			triggerQueuedRequests();
 		}
+		
+		lastCheckpointCompletionNanos = System.nanoTime();
+
+		LOG.info("Completed checkpoint {} (in {} ms).", checkpointId, completedCheckpoint.getDuration());
 
-		// send the confirmation messages to the necessary targets. we do this here
-		// to be outside the lock scope
-		if (completed != null) {
-			final long timestamp = completed.getTimestamp();
+		if (LOG.isDebugEnabled()) {
+			StringBuilder builder = new StringBuilder();
+			builder.append("Checkpoint state: ");
+			for (TaskState state : completedCheckpoint.getTaskStates().values()) {
+				builder.append(state);
+				builder.append(", ");
+			}
+			// Remove last two chars ", "
+			builder.delete(builder.length() - 2, builder.length());
+
+			LOG.debug(builder.toString());
+		}
 
-			for (ExecutionVertex ev : tasksToCommitTo) {
-				Execution ee = ev.getCurrentExecutionAttempt();
-				if (ee != null) {
+		final long timestamp = completedCheckpoint.getTimestamp();
+
+		for (ExecutionVertex ev : tasksToCommitTo) {
+			Execution ee = ev.getCurrentExecutionAttempt();
+			if (ee != null) {
 					ExecutionAttemptID attemptId = ee.getAttemptId();
 					NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
 					ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
-				}
 			}
-
-			statsTracker.onCompletedCheckpoint(completed);
 		}
 
-		return isPendingCheckpoint;
+		statsTracker.onCompletedCheckpoint(completedCheckpoint);
 	}
 
 	private void rememberRecentCheckpointId(long id) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
new file mode 100644
index 0000000..707878c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Base class for checkpoint related exceptions.
+ */
+public class CheckpointException extends Exception {
+
+	private static final long serialVersionUID = -4341865597039002540L;
+
+	public CheckpointException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CheckpointException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 22ba9f2..6f185bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -129,12 +129,10 @@ public class PendingCheckpoint {
 		return discarded;
 	}
 	
-	public CompletedCheckpoint finalizeCheckpoint() throws Exception {
+	public CompletedCheckpoint finalizeCheckpoint() {
 		synchronized (lock) {
-			if (discarded) {
-				throw new IllegalStateException("pending checkpoint is discarded");
-			}
-			if (notYetAcknowledgedTasks.isEmpty()) {
+			Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
+
 				CompletedCheckpoint completed =  new CompletedCheckpoint(
 					jobId,
 					checkpointId,
@@ -144,10 +142,6 @@ public class PendingCheckpoint {
 				dispose(null, false);
 				
 				return completed;
-			}
-			else {
-				throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
-			}
 		}
 	}
 	
@@ -237,10 +231,9 @@ public class PendingCheckpoint {
 
 	private void dispose(final ClassLoader userClassLoader, boolean releaseState) {
 		synchronized (lock) {
-			discarded = true;
 			numAcknowledgedTasks = -1;
 			try {
-				if (releaseState) {
+				if (!discarded && releaseState) {
 					executor.execute(new Runnable() {
 						@Override
 						public void run() {
@@ -257,6 +250,7 @@ public class PendingCheckpoint {
 
 				}
 			} finally {
+				discarded = true;
 				taskStates.clear();
 				notYetAcknowledgedTasks.clear();
 				acknowledgedTasks.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
new file mode 100644
index 0000000..e74bbd8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.util.TestExecutors;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PendingCheckpoint.class)
+public class CheckpointCoordinatorFailureTest extends TestLogger {
+
+	/**
+	 * Tests that a failure while storing a completed checkpoint in the completed checkpoint store
+	 * will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
+	 */
+	@Test
+	public void testFailingCompletedCheckpointStoreAdd() throws Exception {
+		JobID jid = new JobID();
+
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex vertex = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptId);
+
+		final long triggerTimestamp = 1L;
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			42,
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			getClass().getClassLoader(),
+			new StandaloneCheckpointIDCounter(),
+			new FailingCompletedCheckpointStore(),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			TestExecutors.directExecutor());
+
+		coord.triggerCheckpoint(triggerTimestamp);
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+
+		assertFalse(pendingCheckpoint.isDiscarded());
+
+		final long checkpointId =coord.getPendingCheckpoints().keySet().iterator().next();
+
+		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId);
+
+		CompletedCheckpoint completedCheckpoint = mock(CompletedCheckpoint.class);
+		PowerMockito.whenNew(CompletedCheckpoint.class).withAnyArguments().thenReturn(completedCheckpoint);
+
+		try {
+			coord.receiveAcknowledgeMessage(acknowledgeMessage);
+			fail("Expected a checkpoint exception because the completed checkpoint store could not " +
+				"store the completed checkpoint.");
+		} catch (CheckpointException e) {
+			// ignore because we expected this exception
+		}
+
+		// make sure that the pending checkpoint has been discarded after we could not complete it
+		assertTrue(pendingCheckpoint.isDiscarded());
+
+		verify(completedCheckpoint).discard(getClass().getClassLoader());
+	}
+
+	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
+
+		@Override
+		public void recover() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
+			throw new Exception("The failing completed checkpoint store failed again... :-(");
+		}
+
+		@Override
+		public CompletedCheckpoint getLatestCheckpoint() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void shutdown() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void suspend() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public int getNumberOfRetainedCheckpoints() {
+			return -1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9159711..d02e48f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1500,7 +1500,7 @@ public class CheckpointCoordinatorTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
+	static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
 		return mockExecutionVertex(attemptID, 1);
 	}
 
@@ -1508,7 +1508,7 @@ public class CheckpointCoordinatorTest {
 		return mockExecutionVertex(attemptId, ExecutionState.RUNNING, parallelism);
 	}
 
-	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, 
+	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID,
 														ExecutionState state, ExecutionState ... successiveStates) {
 		return mockExecutionVertex(attemptID, state, 1, successiveStates);
 	}