You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/09 16:46:49 UTC

[1/4] flink git commit: [FLINK-5285] Abort checkpoint only once in BarrierTracker

Repository: flink
Updated Branches:
  refs/heads/master 41d5875bf -> 0c42d258e


[FLINK-5285] Abort checkpoint only once in BarrierTracker

Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints
to trigger a flood of cancellation markers for down stream operators. This is done by
aborting each checkpoint only once and don't re-create checkpoint barrier counts for already
aborted checkpoints.

Add test case

This closes #2963.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3f19a5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3f19a5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3f19a5b

Branch: refs/heads/master
Commit: d3f19a5bead1d0709da733b75d729afa9341c250
Parents: 41d5875
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 7 19:05:47 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 9 14:41:15 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/io/BarrierBuffer.java     |  5 ++-
 .../streaming/runtime/io/BarrierTracker.java    | 25 ++++++++-----
 .../runtime/io/BarrierTrackerTest.java          | 37 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3f19a5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0940133..0baf126 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -313,6 +313,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				currentCheckpointId = barrierId;
 				startOfAlignmentTimestamp = 0L;
 				latestAlignmentDurationNanos = 0L;
+
+				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
+
 				notifyAbortOnCancellationBarrier(barrierId);
 			}
 
@@ -422,7 +425,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		startOfAlignmentTimestamp = System.nanoTime();
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Starting stream alignment for checkpoint " + checkpointId);
+			LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.');
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d3f19a5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index f497f4b..9351f1b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -211,6 +211,11 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		CheckpointBarrierCount cbc;
 		while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) {
 			pendingCheckpoints.removeFirst();
+
+			if (cbc.markAborted()) {
+				// abort the subsumed checkpoints if not already done
+				notifyAbort(cbc.checkpointId());
+			}
 		}
 
 		if (cbc != null && cbc.checkpointId() == checkpointId) {
@@ -226,17 +231,19 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 				pendingCheckpoints.removeFirst();
 			}
 		}
-		else {
+		else if (checkpointId > latestPendingCheckpointID) {
 			notifyAbort(checkpointId);
 
-			// first barrier for this checkpoint - remember it as aborted
-			// since we polled away all entries with lower checkpoint IDs
-			// this entry will become the new first entry
-			if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) {
-				CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
-				abortedMarker.markAborted();
-				pendingCheckpoints.addFirst(abortedMarker);
-			}
+			latestPendingCheckpointID = checkpointId;
+
+			CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
+			abortedMarker.markAborted();
+			pendingCheckpoints.addFirst(abortedMarker);
+
+			// we have removed all other pending checkpoint barrier counts --> no need to check that
+			// we don't exceed the maximum checkpoints to track
+		} else {
+			// trailing cancellation barrier which was already cancelled
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d3f19a5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 7ae144d..0d9e6ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -36,6 +36,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for the behavior of the barrier tracker.
@@ -426,6 +431,38 @@ public class BarrierTrackerTest {
 
 		assertTrue(tracker.isEmpty());
 	}
+
+	/**
+	 * Tests that each checkpoint is only aborted once in case of an interleaved cancellation
+	 * barrier arrival of two consecutive checkpoints.
+	 */
+	@Test
+	public void testInterleavedCancellationBarriers() throws Exception {
+		BufferOrEvent[] sequence = {
+			createBarrier(1L, 0),
+			createCancellationBarrier(2L, 0),
+			createCancellationBarrier(1L, 1),
+			createCancellationBarrier(2L, 1),
+			createCancellationBarrier(1L, 2),
+			createCancellationBarrier(2L, 2),
+			createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierTracker tracker = new BarrierTracker(gate);
+		StatefulTask statefulTask = mock(StatefulTask.class);
+
+		tracker.registerCheckpointEventHandler(statefulTask);
+
+		for (BufferOrEvent boe : sequence) {
+			if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) {
+				assertEquals(boe, tracker.getNextNonBlocked());
+			}
+		}
+
+		verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(1L), any(Throwable.class));
+		verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(2L), any(Throwable.class));
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Utils


[4/4] flink git commit: [FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointStore in CheckpointCoordinator

Posted by tr...@apache.org.
[FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointStore in CheckpointCoordinator

Handle exceptions from the CompletedCheckpointStore properly in the CheckpointCoordinator. This
means that in case of an exception, the completed checkpoint will be properly cleaned up and also
the triggering of subsequent checkpoints will be started.

This closes #2872.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c42d258
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c42d258
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c42d258

Branch: refs/heads/master
Commit: 0c42d258e9d9d30eeeee7d0f1487ef0ac8b90fa4
Parents: add3765
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 24 18:16:28 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 9 16:05:43 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 179 ++++++++++++-------
 .../runtime/checkpoint/CheckpointException.java |  35 ++++
 .../runtime/checkpoint/CompletedCheckpoint.java |   2 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |  65 +++----
 .../CheckpointCoordinatorFailureTest.java       | 141 +++++++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   |   2 +-
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/PendingCheckpointTest.java       |   1 -
 8 files changed, 318 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5f0fd74..3ce7a5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -609,25 +609,18 @@ public class CheckpointCoordinator {
 	 *
 	 * @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
 	 */
-	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
+	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
 		if (shutdown || message == null) {
 			return false;
 		}
 
 		if (!job.equals(message.getJob())) {
-			LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message);
+			LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
 			return false;
 		}
 
 		final long checkpointId = message.getCheckpointId();
 
-		CompletedCheckpoint completed = null;
-		PendingCheckpoint checkpoint;
-
-		// Flag indicating whether the ack message was for a known pending
-		// checkpoint.
-		boolean isPendingCheckpoint;
-
 		synchronized (lock) {
 			// we need to check inside the lock for being shutdown as well, otherwise we
 			// get races and invalid error log messages
@@ -635,10 +628,9 @@ public class CheckpointCoordinator {
 				return false;
 			}
 
-			checkpoint = pendingCheckpoints.get(checkpointId);
+			final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
 
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
-				isPendingCheckpoint = true;
 
 				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
 					case SUCCESS:
@@ -646,37 +638,7 @@ public class CheckpointCoordinator {
 							checkpointId, message.getTaskExecutionId(), message.getJob());
 
 						if (checkpoint.isFullyAcknowledged()) {
-
-							// record the time when this was completed, to calculate
-							// the 'min delay between checkpoints'
-							lastCheckpointCompletionNanos = System.nanoTime();
-
-							// complete the checkpoint structure
-							completed = checkpoint.finalizeCheckpoint();
-							completedCheckpointStore.addCheckpoint(completed);
-
-							LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
-								completed.getStateSize(), completed.getDuration());
-
-							if (LOG.isDebugEnabled()) {
-								StringBuilder builder = new StringBuilder();
-								builder.append("Checkpoint state: ");
-								for (TaskState state : completed.getTaskStates().values()) {
-									builder.append(state);
-									builder.append(", ");
-								}
-								// Remove last two chars ", "
-								builder.delete(builder.length() - 2, builder.length());
-
-								LOG.debug(builder.toString());
-							}
-
-							pendingCheckpoints.remove(checkpointId);
-							rememberRecentCheckpointId(checkpointId);
-
-							dropSubsumedCheckpoints(completed.getCheckpointID());
-
-							triggerQueuedRequests();
+							completePendingCheckpoint(checkpoint);
 						}
 						break;
 					case DUPLICATE:
@@ -700,6 +662,8 @@ public class CheckpointCoordinator {
 
 						discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
 				}
+
+				return true;
 			}
 			else if (checkpoint != null) {
 				// this should not happen
@@ -707,39 +671,106 @@ public class CheckpointCoordinator {
 						"Received message for discarded but non-removed checkpoint " + checkpointId);
 			}
 			else {
+				boolean wasPendingCheckpoint;
+
 				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
 				if (recentPendingCheckpoints.contains(checkpointId)) {
-					isPendingCheckpoint = true;
+					wasPendingCheckpoint = true;
 					LOG.warn("Received late message for now expired checkpoint attempt {} from " +
 						"{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
 				}
 				else {
 					LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
 						checkpointId, message.getTaskExecutionId(), message.getJob());
-					isPendingCheckpoint = false;
+					wasPendingCheckpoint = false;
 				}
 
 				// try to discard the state so that we don't have lingering state lying around
 				discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
+
+				return wasPendingCheckpoint;
 			}
 		}
+	}
 
-		// send the confirmation messages to the necessary targets. we do this here
-		// to be outside the lock scope
-		if (completed != null) {
-			final long timestamp = completed.getTimestamp();
+	/**
+	 * Try to complete the given pending checkpoint.
+	 *
+	 * Important: This method should only be called in the checkpoint lock scope.
+	 *
+	 * @param pendingCheckpoint to complete
+	 * @throws CheckpointException if the completion failed
+	 */
+	private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
+		final long checkpointId = pendingCheckpoint.getCheckpointId();
+		CompletedCheckpoint completedCheckpoint = null;
 
-			for (ExecutionVertex ev : tasksToCommitTo) {
-				Execution ee = ev.getCurrentExecutionAttempt();
-				if (ee != null) {
-					ee.notifyCheckpointComplete(checkpointId, timestamp);
-				}
+		try {
+			completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
+
+			completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+			rememberRecentCheckpointId(checkpointId);
+			dropSubsumedCheckpoints(checkpointId);
+		} catch (Exception exception) {
+			// abort the current pending checkpoint if it has not been discarded yet
+			if (!pendingCheckpoint.isDiscarded()) {
+				pendingCheckpoint.abortError(exception);
 			}
 
-			statsTracker.onCompletedCheckpoint(completed);
+			if (completedCheckpoint != null) {
+				// we failed to store the completed checkpoint. Let's clean up
+				final CompletedCheckpoint cc = completedCheckpoint;
+
+				executor.execute(new Runnable() {
+					@Override
+					public void run() {
+						try {
+							cc.discard();
+						} catch (Exception nestedException) {
+							LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException);
+						}
+					}
+				});
+			}
+
+			throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
+		} finally {
+			pendingCheckpoints.remove(checkpointId);
+
+			triggerQueuedRequests();
 		}
+		
+		// record the time when this was completed, to calculate
+		// the 'min delay between checkpoints'
+		lastCheckpointCompletionNanos = System.nanoTime();
+
+		LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+			completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
 
-		return isPendingCheckpoint;
+		if (LOG.isDebugEnabled()) {
+			StringBuilder builder = new StringBuilder();
+			builder.append("Checkpoint state: ");
+			for (TaskState state : completedCheckpoint.getTaskStates().values()) {
+				builder.append(state);
+				builder.append(", ");
+			}
+			// Remove last two chars ", "
+			builder.delete(builder.length() - 2, builder.length());
+
+			LOG.debug(builder.toString());
+		}
+
+		final long timestamp = completedCheckpoint.getTimestamp();
+
+		for (ExecutionVertex ev : tasksToCommitTo) {
+			Execution ee = ev.getCurrentExecutionAttempt();
+			if (ee != null) {
+				ee.notifyCheckpointComplete(checkpointId, timestamp);
+			}
+		}
+
+		statsTracker.onCompletedCheckpoint(completedCheckpoint);
 	}
 
 	private void rememberRecentCheckpointId(long id) {
@@ -958,22 +989,34 @@ public class CheckpointCoordinator {
 		}
 	}
 
+	/**
+	 * Discards the given state object asynchronously belonging to the given job, execution attempt
+	 * id and checkpoint id.
+	 *
+	 * @param jobId identifying the job to which the state object belongs
+	 * @param executionAttemptID identifying the task to which the state object belongs
+	 * @param checkpointId of the state object
+	 * @param stateObject to discard asynchronously
+	 */
 	private void discardState(
-		final JobID jobId,
-		final ExecutionAttemptID executionAttemptID,
-		final long checkpointId,
-		final StateObject stateObject) {
-		executor.execute(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					stateObject.discardState();
-				} catch (Exception e) {
+			final JobID jobId,
+			final ExecutionAttemptID executionAttemptID,
+			final long checkpointId,
+			final StateObject stateObject) {
+		
+		if (stateObject != null) {
+			executor.execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						stateObject.discardState();
+					} catch (Throwable throwable) {
 					LOG.warn("Could not properly discard state object of checkpoint {} " +
 						"belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId,
-						e);
+						throwable);
+					}
 				}
-			}
-		});
+			});
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
new file mode 100644
index 0000000..707878c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Base class for checkpoint related exceptions.
+ */
+public class CheckpointException extends Exception {
+
+	private static final long serialVersionUID = -4341865597039002540L;
+
+	public CheckpointException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CheckpointException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index ed65011..0e70b1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -151,7 +151,7 @@ public class CompletedCheckpoint implements Serializable {
 		}
 	}
 
-	private void discard() throws Exception {
+	void discard() throws Exception {
 		try {
 			if (externalPath != null) {
 				SavepointStore.removeSavepoint(externalPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index cfb59f6..e7df5bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -35,6 +35,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -198,47 +199,37 @@ public class PendingCheckpoint {
 		return onCompletionPromise;
 	}
 
-	public CompletedCheckpoint finalizeCheckpoint() throws Exception {
+	public CompletedCheckpoint finalizeCheckpoint() {
 		synchronized (lock) {
-			try {
-				if (discarded) {
-					throw new IllegalStateException("pending checkpoint is discarded");
+			Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
+
+			// Persist if required
+			String externalPath = null;
+			if (props.externalizeCheckpoint()) {
+				try {
+					Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
+					externalPath = SavepointStore.storeSavepoint(
+							targetDirectory,
+							savepoint);
+				} catch (IOException e) {
+					LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
 				}
-				if (notYetAcknowledgedTasks.isEmpty()) {
-					// Persist if required
-					String externalPath = null;
-					if (props.externalizeCheckpoint()) {
-						try {
-							Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
-							externalPath = SavepointStore.storeSavepoint(
-									targetDirectory,
-									savepoint);
-						} catch (Throwable t) {
-							LOG.error("Failed to persist checkpoints " + checkpointId + ".", t);
-						}
-					}
+			}
 
-					CompletedCheckpoint completed = new CompletedCheckpoint(
-							jobId,
-							checkpointId,
-							checkpointTimestamp,
-							System.currentTimeMillis(),
-							new HashMap<>(taskStates),
-							props,
-							externalPath);
+			CompletedCheckpoint completed = new CompletedCheckpoint(
+					jobId,
+					checkpointId,
+					checkpointTimestamp,
+					System.currentTimeMillis(),
+					new HashMap<>(taskStates),
+					props,
+					externalPath);
 
-					onCompletionPromise.complete(completed);
+			onCompletionPromise.complete(completed);
 
-					dispose(false);
+			dispose(false);
 
-					return completed;
-				} else {
-					throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
-				}
-			} catch (Throwable t) {
-				onCompletionPromise.completeExceptionally(t);
-				throw t;
-			}
+			return completed;
 		}
 	}
 
@@ -378,9 +369,8 @@ public class PendingCheckpoint {
 	private void dispose(boolean releaseState) {
 		synchronized (lock) {
 			try {
-				discarded = true;
 				numAcknowledgedTasks = -1;
-				if (releaseState) {
+				if (!discarded && releaseState) {
 					executor.execute(new Runnable() {
 						@Override
 						public void run() {
@@ -395,6 +385,7 @@ public class PendingCheckpoint {
 
 				}
 			} finally {
+				discarded = true;
 				taskStates.clear();
 				notYetAcknowledgedTasks.clear();
 				acknowledgedTasks.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
new file mode 100644
index 0000000..26db012
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PendingCheckpoint.class)
+public class CheckpointCoordinatorFailureTest extends TestLogger {
+
+	/**
+	 * Tests that a failure while storing a completed checkpoint in the completed checkpoint store
+	 * will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
+	 */
+	@Test
+	public void testFailingCompletedCheckpointStoreAdd() throws Exception {
+		JobID jid = new JobID();
+
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex vertex = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptId);
+
+		final long triggerTimestamp = 1L;
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			new ExecutionVertex[]{vertex},
+			new StandaloneCheckpointIDCounter(),
+			new FailingCompletedCheckpointStore(),
+			null,
+			new DisabledCheckpointStatsTracker(),
+			Executors.directExecutor());
+
+		coord.triggerCheckpoint(triggerTimestamp, false);
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+
+		assertFalse(pendingCheckpoint.isDiscarded());
+
+		final long checkpointId =coord.getPendingCheckpoints().keySet().iterator().next();
+
+		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, triggerTimestamp);
+		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointMetaData);
+
+		CompletedCheckpoint completedCheckpoint = mock(CompletedCheckpoint.class);
+		PowerMockito.whenNew(CompletedCheckpoint.class).withAnyArguments().thenReturn(completedCheckpoint);
+
+		try {
+			coord.receiveAcknowledgeMessage(acknowledgeMessage);
+			fail("Expected a checkpoint exception because the completed checkpoint store could not " +
+				"store the completed checkpoint.");
+		} catch (CheckpointException e) {
+			// ignore because we expected this exception
+		}
+
+		// make sure that the pending checkpoint has been discarded after we could not complete it
+		assertTrue(pendingCheckpoint.isDiscarded());
+
+		verify(completedCheckpoint).discard();
+	}
+
+	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
+
+		@Override
+		public void recover() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
+			throw new Exception("The failing completed checkpoint store failed again... :-(");
+		}
+
+		@Override
+		public CompletedCheckpoint getLatestCheckpoint() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void shutdown(JobStatus jobStatus) throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public int getNumberOfRetainedCheckpoints() {
+			return -1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 73eebcf..463c2ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2589,7 +2589,7 @@ public class CheckpointCoordinatorTest {
 		return executionJobVertex;
 	}
 
-	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
+	static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
 		return mockExecutionVertex(
 			attemptID,
 			new JobVertexID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 6b0d3f8..9f4bdba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -260,7 +260,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			}
 		}
 
-		private void discard() {
+		void discard() {
 			if (!isDiscarded) {
 				this.isDiscarded = true;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c42d258/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index e918965..be7b033 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -158,7 +158,6 @@ public class PendingCheckpointTest {
 		} catch (IllegalStateException ignored) {
 			// Expected
 		}
-		assertTrue(future.isDone());
 	}
 
 	/**


[3/4] flink git commit: [FLINK-5193] [jm] Harden job recovery in case of recovery failures

Posted by tr...@apache.org.
[FLINK-5193] [jm] Harden job recovery in case of recovery failures

When recovering multiple jobs a single recovery failure caused all jobs to be not recovered.
This PR changes this behaviour to make the recovery of jobs independent so that a single
failure won't stall the complete recovery. Furthermore, this PR improves the error reporting
for failures originating in the ZooKeeperSubmittedJobGraphStore.

Add test case

Fix failing JobManagerHACheckpointRecoveryITCase

This closes #2909.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/add3765d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/add3765d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/add3765d

Branch: refs/heads/master
Commit: add3765d1626a04fb98b8f36cb725eb32806d8b6
Parents: ea70807
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 29 17:31:08 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 9 14:42:58 2016 +0100

----------------------------------------------------------------------
 .../StandaloneSubmittedJobGraphStore.java       |  11 +-
 .../jobmanager/SubmittedJobGraphStore.java      |  19 +--
 .../ZooKeeperSubmittedJobGraphStore.java        | 109 +++++++------
 .../zookeeper/ZooKeeperStateHandleStore.java    |  44 ++++-
 .../flink/runtime/jobmanager/JobManager.scala   |  38 ++---
 .../jobmanager/JobManagerHARecoveryTest.java    | 161 ++++++++++++++++++-
 .../StandaloneSubmittedJobGraphStoreTest.java   |  11 +-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |  29 ++--
 .../JobManagerHACheckpointRecoveryITCase.java   |   4 +-
 9 files changed, 307 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index 3041cde..d1ca1a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -20,10 +20,9 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
 
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * {@link SubmittedJobGraph} instances for JobManagers running in {@link HighAvailabilityMode#NONE}.
@@ -54,12 +53,12 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore
 	}
 
 	@Override
-	public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
-		return Option.empty();
+	public Collection<JobID> getJobIds() throws Exception {
+		return Collections.emptyList();
 	}
 
 	@Override
-	public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
-		return Collections.emptyList();
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index bd628cd..55c2e79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.Option;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
  * {@link SubmittedJobGraph} instances for recovery.
@@ -40,16 +38,11 @@ public interface SubmittedJobGraphStore {
 	void stop() throws Exception;
 
 	/**
-	 * Returns a list of all submitted {@link JobGraph} instances.
-	 */
-	List<SubmittedJobGraph> recoverJobGraphs() throws Exception;
-
-	/**
 	 * Returns the {@link SubmittedJobGraph} with the given {@link JobID}.
 	 *
 	 * <p>An Exception is thrown, if no job graph with the given ID exists.
 	 */
-	Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception;
+	SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception;
 
 	/**
 	 * Adds the {@link SubmittedJobGraph} instance.
@@ -64,6 +57,14 @@ public interface SubmittedJobGraphStore {
 	void removeJobGraph(JobID jobId) throws Exception;
 
 	/**
+	 * Get all job ids of submitted job graphs to the submitted job graph store.
+	 *
+	 * @return Collection of submitted job ids
+	 * @throws Exception if the operation fails
+	 */
+	Collection<JobID> getJobIds() throws Exception;
+
+	/**
 	 * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
 	 * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index c1dc656..aaafa76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -24,18 +24,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -156,47 +153,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	}
 
 	@Override
-	public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
-		synchronized (cacheLock) {
-			verifyIsRunning();
-
-			LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
-			List<Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String>> submitted;
-
-			while (true) {
-				try {
-					submitted = jobGraphsInZooKeeper.getAll();
-					break;
-				}
-				catch (ConcurrentModificationException e) {
-					LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
-				}
-			}
-
-			LOG.info("Found {} job graphs.", submitted.size());
-
-			if (submitted.size() != 0) {
-				List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
-
-				for (Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
-					SubmittedJobGraph jobGraph = jobStateHandle.f0.retrieveState();
-					addedJobGraphs.add(jobGraph.getJobId());
-
-					jobGraphs.add(jobGraph);
-				}
-
-				LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs);
-				return jobGraphs;
-			}
-			else {
-				LOG.info("No job graph to recover.");
-				return Collections.emptyList();
-			}
-		}
-	}
-
-	@Override
-	public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 		checkNotNull(jobId, "Job ID");
 		String path = getPathForJob(jobId);
 
@@ -205,17 +162,29 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
-			try {
-				SubmittedJobGraph jobGraph = jobGraphsInZooKeeper.get(path).retrieveState();
-				addedJobGraphs.add(jobGraph.getJobId());
+			RetrievableStateHandle<SubmittedJobGraph> jobGraphRetrievableStateHandle;
 
-				LOG.info("Recovered {}.", jobGraph);
-
-				return Option.apply(jobGraph);
+			try {
+				jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.get(path);
+			} catch (KeeperException.NoNodeException ignored) {
+				return null;
+			} catch (Exception e) {
+				throw new Exception("Could not retrieve the submitted job graph state handle " +
+					"for " + path + "from the submitted job graph store.", e);
 			}
-			catch (KeeperException.NoNodeException ignored) {
-				return Option.empty();
+			SubmittedJobGraph jobGraph;
+
+			try {
+				jobGraph = jobGraphRetrievableStateHandle.retrieveState();
+			} catch (Exception e) {
+				throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
 			}
+
+			addedJobGraphs.add(jobGraph.getJobId());
+
+			LOG.info("Recovered {}.", jobGraph);
+
+			return jobGraph;
 		}
 	}
 
@@ -283,6 +252,31 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 	}
 
+	@Override
+	public Collection<JobID> getJobIds() throws Exception {
+		Collection<String> paths;
+
+		LOG.debug("Retrieving all stored job ids from ZooKeeper under {}.", zooKeeperFullBasePath);
+
+		try {
+			paths = jobGraphsInZooKeeper.getAllPaths();
+		} catch (Exception e) {
+			throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e);
+		}
+
+		List<JobID> jobIds = new ArrayList<>(paths.size());
+
+		for (String path : paths) {
+			try {
+				jobIds.add(jobIdfromPath(path));
+			} catch (Exception exception) {
+				LOG.warn("Could not parse job id from {}. This indicates a malformed path.", path, exception);
+			}
+		}
+
+		return jobIds;
+	}
+
 	/**
 	 * Monitors ZooKeeper for changes.
 	 *
@@ -405,4 +399,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		return String.format("/%s", jobId);
 	}
 
+	/**
+	 * Returns the JobID from the given path in ZooKeeper.
+	 *
+	 * @param path in ZooKeeper
+	 * @return JobID associated with the given path
+	 */
+	public static JobID jobIdfromPath(final String path) {
+		return JobID.fromHexString(path);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 14d9f6f..dd32efb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -30,8 +30,11 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -225,8 +228,45 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	public RetrievableStateHandle<T> get(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-		byte[] data = client.getData().forPath(pathInZooKeeper);
-		return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader());
+		byte[] data;
+
+		try {
+			data = client.getData().forPath(pathInZooKeeper);
+		} catch (Exception e) {
+			throw new Exception("Failed to retrieve state handle data under " + pathInZooKeeper +
+				" from ZooKeeper.", e);
+		}
+
+		try {
+			return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader());
+		} catch (IOException | ClassNotFoundException e) {
+			throw new Exception("Failed to deserialize state handle from ZooKeeper data from " +
+				pathInZooKeeper + '.', e);
+		}
+	}
+
+	/**
+	 * Return a list of all valid paths for state handles.
+	 *
+	 * @return List of valid state handle paths in ZooKeeper
+	 * @throws Exception if a ZooKeeper operation fails
+	 */
+	public Collection<String> getAllPaths() throws Exception {
+		final String path = "/";
+
+		while(true) {
+			Stat stat = client.checkExists().forPath(path);
+
+			if (stat == null) {
+				return Collections.emptyList();
+			} else {
+				try {
+					return client.getChildren().forPath(path);
+				} catch (KeeperException.NoNodeException ignored) {
+					// Concurrent deletion, retry
+				}
+			}
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 982efe8..1dfd3db 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -495,6 +495,8 @@ class JobManager(
 
     case RecoverSubmittedJob(submittedJobGraph) =>
       if (!currentJobs.contains(submittedJobGraph.getJobId)) {
+        log.info(s"Submitting recovered job ${submittedJobGraph.getJobId}.")
+
         submitJob(
           submittedJobGraph.getJobGraph(),
           submittedJobGraph.getJobInfo(),
@@ -516,7 +518,7 @@ class JobManager(
             log.info(s"Attempting to recover job $jobId.")
             val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId)
 
-            submittedJobGraphOption match {
+            Option(submittedJobGraphOption) match {
               case Some(submittedJobGraph) =>
                 if (!leaderElectionService.hasLeadership()) {
                   // we've lost leadership. mission: abort.
@@ -529,37 +531,31 @@ class JobManager(
             }
           }
         } catch {
-          case t: Throwable => log.error(s"Failed to recover job $jobId.", t)
+          case t: Throwable => log.warn(s"Failed to recover job $jobId.", t)
         }
       }(context.dispatcher)
 
     case RecoverAllJobs =>
       future {
-        try {
-          // The ActorRef, which is part of the submitted job graph can only be
-          // de-serialized in the scope of an actor system.
-          akka.serialization.JavaSerializer.currentSystem.withValue(
-            context.system.asInstanceOf[ExtendedActorSystem]) {
-
-            log.info(s"Attempting to recover all jobs.")
+        log.info("Attempting to recover all jobs.")
 
-            val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
+        try {
+          val jobIdsToRecover = submittedJobGraphs.getJobIds().asScala
 
-            if (!leaderElectionService.hasLeadership()) {
-              // we've lost leadership. mission: abort.
-              log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
-                s"jobs.")
-            } else {
-              log.info(s"Re-submitting ${jobGraphs.size} job graphs.")
+          if (jobIdsToRecover.isEmpty) {
+            log.info("There are no jobs to recover.")
+          } else {
+            log.info(s"There are ${jobIdsToRecover.size} jobs to recover. Starting the job " +
+                       s"recovery.")
 
-              jobGraphs.foreach{
-                submittedJobGraph =>
-                  self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
-              }
+            jobIdsToRecover foreach {
+              jobId => self ! decorateMessage(RecoverJob(jobId))
             }
           }
         } catch {
-          case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t)
+          case e: Exception =>
+            log.warn("Failed to recover job ids from submitted job graph store. Aborting " +
+                       "recovery.", e)
         }
       }(context.dispatcher)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 69aac31..36412f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,8 +20,13 @@ package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.japi.pf.FI;
+import akka.japi.pf.ReceiveBuilder;
+import akka.pattern.Patterns;
+import akka.testkit.CallingThreadDispatcher;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
@@ -31,6 +36,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -39,8 +45,10 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceManager;
@@ -53,10 +61,12 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
@@ -77,25 +87,35 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import scala.Int;
 import scala.Option;
+import scala.PartialFunction;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
+import scala.runtime.BoxedUnit;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class JobManagerHARecoveryTest {
 
@@ -296,6 +316,131 @@ public class JobManagerHARecoveryTest {
 	}
 
 	/**
+	 * Tests that a failing job recovery won't cause other job recoveries to fail.
+	 */
+	@Test
+	public void testFailingJobRecovery() throws Exception {
+		final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+		final FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS);
+		Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow();
+		final Configuration flinkConfiguration = new Configuration();
+		UUID leaderSessionID = UUID.randomUUID();
+		ActorRef jobManager = null;
+		JobID jobId1 = new JobID();
+		JobID jobId2 = new JobID();
+
+		// set HA mode to zookeeper so that we try to recover jobs
+		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+		try {
+			final SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
+
+			SubmittedJobGraph submittedJobGraph = mock(SubmittedJobGraph.class);
+			when(submittedJobGraph.getJobId()).thenReturn(jobId2);
+
+			when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobId1, jobId2));
+
+			// fail the first job recovery
+			when(submittedJobGraphStore.recoverJobGraph(eq(jobId1))).thenThrow(new Exception("Test exception"));
+			// succeed the second job recovery
+			when(submittedJobGraphStore.recoverJobGraph(eq(jobId2))).thenReturn(submittedJobGraph);
+
+			final TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
+
+			final Collection<JobID> recoveredJobs = new ArrayList<>(2);
+
+			Props jobManagerProps = Props.create(
+				TestingFailingHAJobManager.class,
+				flinkConfiguration,
+				Executors.directExecutor(),
+				Executors.directExecutor(),
+				mock(InstanceManager.class),
+				mock(Scheduler.class),
+				new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
+				ActorRef.noSender(),
+				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+				timeout,
+				myLeaderElectionService,
+				submittedJobGraphStore,
+				mock(CheckpointRecoveryFactory.class),
+				jobRecoveryTimeout,
+				Option.<MetricRegistry>apply(null),
+				recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
+
+			jobManager = system.actorOf(jobManagerProps, "jobmanager");
+
+			Future<Object> started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis());
+
+			Await.ready(started, deadline.timeLeft());
+
+			// make the job manager the leader --> this triggers the recovery of all jobs
+			myLeaderElectionService.isLeader(leaderSessionID);
+
+			// check that we have successfully recovered the second job
+			assertThat(recoveredJobs, containsInAnyOrder(jobId2));
+		} finally {
+			TestingUtils.stopActor(jobManager);
+		}
+	}
+
+	static class TestingFailingHAJobManager extends JobManager {
+
+		private final Collection<JobID> recoveredJobs;
+
+		public TestingFailingHAJobManager(
+			Configuration flinkConfiguration,
+			Executor futureExecutor,
+			Executor ioExecutor,
+			InstanceManager instanceManager,
+			Scheduler scheduler,
+			BlobLibraryCacheManager libraryCacheManager,
+			ActorRef archive,
+			RestartStrategyFactory restartStrategyFactory,
+			FiniteDuration timeout,
+			LeaderElectionService leaderElectionService,
+			SubmittedJobGraphStore submittedJobGraphs,
+			CheckpointRecoveryFactory checkpointRecoveryFactory,
+			FiniteDuration jobRecoveryTimeout,
+			Option<MetricRegistry> metricsRegistry,
+			Collection<JobID> recoveredJobs) {
+			super(
+				flinkConfiguration,
+				futureExecutor,
+				ioExecutor,
+				instanceManager,
+				scheduler,
+				libraryCacheManager,
+				archive,
+				restartStrategyFactory,
+				timeout,
+				leaderElectionService,
+				submittedJobGraphs,
+				checkpointRecoveryFactory,
+				jobRecoveryTimeout,
+				metricsRegistry);
+
+			this.recoveredJobs = recoveredJobs;
+		}
+
+		@Override
+		public PartialFunction<Object, BoxedUnit> handleMessage() {
+			return ReceiveBuilder.match(
+				JobManagerMessages.RecoverSubmittedJob.class,
+				new FI.UnitApply<JobManagerMessages.RecoverSubmittedJob>() {
+					@Override
+					public void apply(JobManagerMessages.RecoverSubmittedJob submitJob) throws Exception {
+						recoveredJobs.add(submitJob.submittedJobGraph().getJobId());
+					}
+				}).matchAny(new FI.UnitApply<Object>() {
+				@Override
+				public void apply(Object o) throws Exception {
+					TestingFailingHAJobManager.super.handleMessage().apply(o);
+				}
+			}).build();
+		}
+	}
+
+	/**
 	 * A checkpoint store, which supports shutdown and suspend. You can use this to test HA
 	 * as long as the factory always returns the same store instance.
 	 */
@@ -391,16 +536,11 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
-			return new ArrayList<>(storedJobs.values());
-		}
-
-		@Override
-		public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+		public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 			if (storedJobs.containsKey(jobId)) {
-				return Option.apply(storedJobs.get(jobId));
+				return storedJobs.get(jobId);
 			} else {
-				return Option.apply(null);
+				return null;
 			}
 		}
 
@@ -414,6 +554,11 @@ public class JobManagerHARecoveryTest {
 			storedJobs.remove(jobId);
 		}
 
+		@Override
+		public Collection<JobID> getJobIds() throws Exception {
+			return storedJobs.keySet();
+		}
+
 		boolean contains(JobID jobId) {
 			return storedJobs.containsKey(jobId);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
index 8ebb7f8..079a10e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStoreTest.java
@@ -19,14 +19,13 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 
 public class StandaloneSubmittedJobGraphStoreTest {
 
@@ -41,14 +40,14 @@ public class StandaloneSubmittedJobGraphStoreTest {
 				new JobGraph("testNoOps"),
 				new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
 
-		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+		assertEquals(0, jobGraphs.getJobIds().size());
 
 		jobGraphs.putJobGraph(jobGraph);
-		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+		assertEquals(0, jobGraphs.getJobIds().size());
 
 		jobGraphs.removeJobGraph(jobGraph.getJobGraph().getJobID());
-		assertEquals(0, jobGraphs.recoverJobGraphs().size());
+		assertEquals(0, jobGraphs.getJobIds().size());
 
-		assertTrue(jobGraphs.recoverJobGraph(new JobID()).isEmpty());
+		assertNull(jobGraphs.recoverJobGraph(new JobID()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index d156f02..9454d90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -40,8 +40,8 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
@@ -101,32 +101,36 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
 
 			// Empty state
-			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+			assertEquals(0, jobGraphs.getJobIds().size());
 
 			// Add initial
 			jobGraphs.putJobGraph(jobGraph);
 
 			// Verify initial job graph
-			List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
-			assertEquals(1, actual.size());
+			Collection<JobID> jobIds = jobGraphs.getJobIds();
+			assertEquals(1, jobIds.size());
 
-			verifyJobGraphs(jobGraph, actual.get(0));
+			JobID jobId = jobIds.iterator().next();
+
+			verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
 
 			// Update (same ID)
 			jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 1);
 			jobGraphs.putJobGraph(jobGraph);
 
 			// Verify updated
-			actual = jobGraphs.recoverJobGraphs();
-			assertEquals(1, actual.size());
+			jobIds = jobGraphs.getJobIds();
+			assertEquals(1, jobIds.size());
+
+			jobId = jobIds.iterator().next();
 
-			verifyJobGraphs(jobGraph, actual.get(0));
+			verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
 
 			// Remove
 			jobGraphs.removeJobGraph(jobGraph.getJobId());
 
 			// Empty state
-			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+			assertEquals(0, jobGraphs.getJobIds().size());
 
 			// Nothing should have been notified
 			verify(listener, atMost(1)).onAddedJobGraph(any(JobID.class));
@@ -162,11 +166,12 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 				jobGraphs.putJobGraph(jobGraph);
 			}
 
-			List<SubmittedJobGraph> actual = jobGraphs.recoverJobGraphs();
+			Collection<JobID> actual = jobGraphs.getJobIds();
 
 			assertEquals(expected.size(), actual.size());
 
-			for (SubmittedJobGraph jobGraph : actual) {
+			for (JobID jobId : actual) {
+				SubmittedJobGraph jobGraph = jobGraphs.recoverJobGraph(jobId);
 				assertTrue(expected.containsKey(jobGraph.getJobId()));
 
 				verifyJobGraphs(expected.get(jobGraph.getJobId()), jobGraph);
@@ -175,7 +180,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 			}
 
 			// Empty state
-			assertEquals(0, jobGraphs.recoverJobGraphs().size());
+			assertEquals(0, jobGraphs.getJobIds().size());
 
 			// Nothing should have been notified
 			verify(listener, atMost(expected.size())).onAddedJobGraph(any(JobID.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/add3765d/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 49eaeb7..3f08b5a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -370,7 +370,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 				nonLeadingJobManagerProcess = jobManagerProcess[0];
 			}
 
-			// BLocking JobGraph
+			// Blocking JobGraph
 			JobVertex blockingVertex = new JobVertex("Blocking vertex");
 			blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
 			JobGraph jobGraph = new JobGraph(blockingVertex);
@@ -399,7 +399,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 				String output = nonLeadingJobManagerProcess.getProcessOutput();
 
 				if (output != null) {
-					if (output.contains("Fatal error: Failed to recover jobs") &&
+					if (output.contains("Failed to recover job") &&
 							output.contains("java.io.FileNotFoundException")) {
 
 						success = true;


[2/4] flink git commit: [FLINK-5278] Improve task and checkpoint related logging

Posted by tr...@apache.org.
[FLINK-5278] Improve task and checkpoint related logging

Add more logging

This closes #2959.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea708071
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea708071
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea708071

Branch: refs/heads/master
Commit: ea7080712f2dcbdf125b806007c80aa3d120f30a
Parents: d3f19a5
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 7 16:22:23 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 9 14:42:13 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  32 ++--
 .../runtime/checkpoint/CompletedCheckpoint.java |   3 +-
 .../ZooKeeperCompletedCheckpointStore.java      |  75 ++++++++-
 .../flink/runtime/executiongraph/Execution.java |   9 +-
 .../apache/flink/runtime/taskmanager/Task.java  | 165 ++++++++++++-------
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  36 ++++
 .../streaming/runtime/tasks/StreamTask.java     |  25 ++-
 7 files changed, 260 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8ca4b2e..5f0fd74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -613,6 +613,7 @@ public class CheckpointCoordinator {
 		if (shutdown || message == null) {
 			return false;
 		}
+
 		if (!job.equals(message.getJob())) {
 			LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message);
 			return false;
@@ -641,6 +642,9 @@ public class CheckpointCoordinator {
 
 				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
 					case SUCCESS:
+						LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
+							checkpointId, message.getTaskExecutionId(), message.getJob());
+
 						if (checkpoint.isFullyAcknowledged()) {
 
 							// record the time when this was completed, to calculate
@@ -651,8 +655,8 @@ public class CheckpointCoordinator {
 							completed = checkpoint.finalizeCheckpoint();
 							completedCheckpointStore.addCheckpoint(completed);
 
-							LOG.info("Completed checkpoint " + checkpointId + " (in " +
-								completed.getDuration() + " ms)");
+							LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+								completed.getStateSize(), completed.getDuration());
 
 							if (LOG.isDebugEnabled()) {
 								StringBuilder builder = new StringBuilder();
@@ -685,7 +689,7 @@ public class CheckpointCoordinator {
 								"the state handle to avoid lingering state.", message.getCheckpointId(),
 							message.getTaskExecutionId(), message.getJob());
 
-						discardState(message.getSubtaskState());
+						discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
 
 						break;
 					case DISCARDED:
@@ -694,7 +698,7 @@ public class CheckpointCoordinator {
 								"state handle tp avoid lingering state.",
 							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
 
-						discardState(message.getSubtaskState());
+						discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
 				}
 			}
 			else if (checkpoint != null) {
@@ -706,15 +710,17 @@ public class CheckpointCoordinator {
 				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
 				if (recentPendingCheckpoints.contains(checkpointId)) {
 					isPendingCheckpoint = true;
-					LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
+					LOG.warn("Received late message for now expired checkpoint attempt {} from " +
+						"{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
 				}
 				else {
-					LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
+					LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
+						checkpointId, message.getTaskExecutionId(), message.getJob());
 					isPendingCheckpoint = false;
 				}
 
 				// try to discard the state so that we don't have lingering state lying around
-				discardState(message.getSubtaskState());
+				discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
 			}
 		}
 
@@ -947,19 +953,25 @@ public class CheckpointCoordinator {
 				triggerCheckpoint(System.currentTimeMillis(), true);
 			}
 			catch (Exception e) {
-				LOG.error("Exception while triggering checkpoint", e);
+				LOG.error("Exception while triggering checkpoint.", e);
 			}
 		}
 	}
 
-	private void discardState(final StateObject stateObject) {
+	private void discardState(
+		final JobID jobId,
+		final ExecutionAttemptID executionAttemptID,
+		final long checkpointId,
+		final StateObject stateObject) {
 		executor.execute(new Runnable() {
 			@Override
 			public void run() {
 				try {
 					stateObject.discardState();
 				} catch (Exception e) {
-					LOG.warn("Could not properly discard state object.", e);
+					LOG.warn("Could not properly discard state object of checkpoint {} " +
+						"belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId,
+						e);
 				}
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 3c33ce3..ed65011 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.StateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
@@ -164,7 +163,7 @@ public class CompletedCheckpoint implements Serializable {
 		}
 	}
 
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		long result = 0L;
 
 		for (TaskState taskState : taskStates.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 4add504..fdd0d40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -163,7 +163,17 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
 					.get(numberOfInitialCheckpoints - 1);
 
-			CompletedCheckpoint latestCheckpoint = latest.f0.retrieveState();
+			CompletedCheckpoint latestCheckpoint;
+			long checkpointId = pathToCheckpointId(latest.f1);
+
+			LOG.info("Trying to retrieve checkpoint {}.", checkpointId);
+
+			try {
+				latestCheckpoint = latest.f0.retrieveState();
+			} catch (Exception e) {
+				throw new Exception("Could not retrieve the completed checkpoint " + checkpointId +
+				" from the state storage.", e);
+			}
 
 			checkpointStateHandles.add(latest);
 
@@ -190,7 +200,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		checkNotNull(checkpoint, "Checkpoint");
 
 		// First add the new one. If it fails, we don't want to loose existing data.
-		String path = String.format("/%s", checkpoint.getCheckpointID());
+		String path = checkpointIdToPath(checkpoint.getCheckpointID());
 
 		final RetrievableStateHandle<CompletedCheckpoint> stateHandle =
 				checkpointsInZooKeeper.add(path, checkpoint);
@@ -298,14 +308,36 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		BackgroundCallback callback = new BackgroundCallback() {
 			@Override
 			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+				final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1);
+
 				try {
 					if (event.getType() == CuratorEventType.DELETE) {
 						if (event.getResultCode() == 0) {
+							Exception exception = null;
+
 							try {
 								action.call();
-							} finally {
+							} catch (Exception e) {
+								exception = new Exception("Could not execute callable action " +
+									"for checkpoint " + checkpointId + '.', e);
+							}
+
+							try {
 								// Discard the state handle
 								stateHandleAndPath.f0.discardState();
+							} catch (Exception e) {
+								Exception newException = new Exception("Could not discard meta " +
+									"data for completed checkpoint " + checkpointId + '.', e);
+
+								if (exception == null) {
+									exception = newException;
+								} else {
+									exception.addSuppressed(newException);
+								}
+							}
+
+							if (exception != null) {
+								throw exception;
 							}
 						} else {
 							throw new IllegalStateException("Unexpected result code " +
@@ -316,7 +348,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 								event.getType() + " in '" + event + "' callback.");
 					}
 				} catch (Exception e) {
-					LOG.error("Failed to discard checkpoint.", e);
+					LOG.warn("Failed to discard checkpoint {}.", checkpointId, e);
 				}
 			}
 		};
@@ -326,4 +358,39 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		// inconsistent state.
 		checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
 	}
+
+	/**
+	 * Convert a checkpoint id into a ZooKeeper path.
+	 *
+	 * @param checkpointId to convert to the path
+	 * @return Path created from the given checkpoint id
+	 */
+	protected static String checkpointIdToPath(long checkpointId) {
+		return String.format("/%s", checkpointId);
+	}
+
+	/**
+	 * Converts a path to the checkpoint id.
+	 *
+	 * @param path in ZooKeeper
+	 * @return Checkpoint id parsed from the path
+	 */
+	protected static long pathToCheckpointId(String path) {
+		try {
+			String numberString;
+
+			// check if we have a leading slash
+			if ('/' == path.charAt(0) ) {
+				numberString = path.substring(1);
+			} else {
+				numberString = path;
+			}
+			return Long.parseLong(numberString);
+		} catch (NumberFormatException e) {
+			LOG.warn("Could not parse checkpoint id from {}. This indicates that the " +
+				"checkpoint id to path conversion has changed.", path);
+
+			return -1L;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 219d71d..16aebce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -1015,14 +1015,17 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
 		// sanity check
 		if (currentState.isTerminal()) {
-			throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + ".");
+			throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + '.');
 		}
 
 		if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
 			markTimestamp(targetState);
 
-			LOG.info(getVertex().getTaskNameWithSubtaskIndex() + " ("  + getAttemptId() + ") switched from "
-				+ currentState + " to " + targetState);
+			if (error == null) {
+				LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState);
+			} else {
+				LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState, error);
+			}
 
 			// make sure that the state transition completes normally.
 			// potential errors (in listeners may not affect the main logic)

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 14ef1bf..184c3b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -504,7 +504,7 @@ public class Task implements Runnable, TaskActions {
 		while (true) {
 			ExecutionState current = this.executionState;
 			if (current == ExecutionState.CREATED) {
-				if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
+				if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
 					// success, we can start our work
 					break;
 				}
@@ -515,14 +515,14 @@ public class Task implements Runnable, TaskActions {
 				return;
 			}
 			else if (current == ExecutionState.CANCELING) {
-				if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {
+				if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
 					// we were immediately canceled. tell the TaskManager that we reached our final state
 					notifyFinalState();
 					return;
 				}
 			}
 			else {
-				throw new IllegalStateException("Invalid state for beginning of task operation");
+				throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
 			}
 		}
 
@@ -543,7 +543,7 @@ public class Task implements Runnable, TaskActions {
 
 			// first of all, get a user-code classloader
 			// this may involve downloading the job's JAR files and/or classes
-			LOG.info("Loading JAR files for task " + taskNameWithSubtask);
+			LOG.info("Loading JAR files for task {}.", this);
 
 			userCodeClassLoader = createUserCodeClassloader(libraryCache);
 			final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
@@ -572,7 +572,7 @@ public class Task implements Runnable, TaskActions {
 			// the registration must also strictly be undone
 			// ----------------------------------------------------------------
 
-			LOG.info("Registering task at network: " + this);
+			LOG.info("Registering task at network: {}.", this);
 
 			network.registerTask(this);
 
@@ -581,13 +581,15 @@ public class Task implements Runnable, TaskActions {
 				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
 						DistributedCache.readFileInfoFromConfig(jobConfiguration))
 				{
-					LOG.info("Obtaining local cache file for '" + entry.getKey() + '\'');
+					LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
 					Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
 					distributedCacheEntries.put(entry.getKey(), cp);
 				}
 			}
 			catch (Exception e) {
-				throw new Exception("Exception while adding files to distributed cache.", e);
+				throw new Exception(
+					String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
+					e);
 			}
 
 			if (isCanceledOrFailed()) {
@@ -638,7 +640,7 @@ public class Task implements Runnable, TaskActions {
 			this.invokable = invokable;
 
 			// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
-			if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
+			if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
 				throw new CancelTaskException();
 			}
 
@@ -671,7 +673,7 @@ public class Task implements Runnable, TaskActions {
 
 			// try to mark the task as finished
 			// if that fails, the task was canceled/failed in the meantime
-			if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
+			if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
 				notifyObservers(ExecutionState.FINISHED, null);
 			}
 			else {
@@ -694,7 +696,7 @@ public class Task implements Runnable, TaskActions {
 
 					if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
 						if (t instanceof CancelTaskException) {
-							if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+							if (transitionState(current, ExecutionState.CANCELED)) {
 								cancelInvokable();
 
 								notifyObservers(ExecutionState.CANCELED, null);
@@ -702,19 +704,19 @@ public class Task implements Runnable, TaskActions {
 							}
 						}
 						else {
-							if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+							if (transitionState(current, ExecutionState.FAILED, t)) {
 								// proper failure of the task. record the exception as the root cause
-								LOG.error("Task execution failed. ", t);
+								String errorMessage = String.format("Execution of {} ({}) failed.", taskNameWithSubtask, executionId);
 								failureCause = t;
 								cancelInvokable();
 
-								notifyObservers(ExecutionState.FAILED, t);
+								notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
 								break;
 							}
 						}
 					}
 					else if (current == ExecutionState.CANCELING) {
-						if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+						if (transitionState(current, ExecutionState.CANCELED)) {
 							notifyObservers(ExecutionState.CANCELED, null);
 							break;
 						}
@@ -724,22 +726,22 @@ public class Task implements Runnable, TaskActions {
 						break;
 					}
 					// unexpected state, go to failed
-					else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
-						LOG.error("Unexpected state in Task during an exception: " + current);
+					else if (transitionState(current, ExecutionState.FAILED, t)) {
+						LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current);
 						break;
 					}
 					// else fall through the loop and
 				}
 			}
 			catch (Throwable tt) {
-				String message = "FATAL - exception in task exception handler";
+				String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId);
 				LOG.error(message, tt);
 				notifyFatalError(message, tt);
 			}
 		}
 		finally {
 			try {
-				LOG.info("Freeing task resources for " + taskNameWithSubtask);
+				LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId);
 
 				// stop the async dispatcher.
 				// copy dispatcher reference to stack, against concurrent release
@@ -767,7 +769,7 @@ public class Task implements Runnable, TaskActions {
 			}
 			catch (Throwable t) {
 				// an error in the resource cleanup is fatal
-				String message = "FATAL - exception in task resource cleanup";
+				String message = String.format("FATAL - exception in resource cleanup of task %s (%s).", taskNameWithSubtask, executionId);
 				LOG.error(message, t);
 				notifyFatalError(message, t);
 			}
@@ -779,7 +781,7 @@ public class Task implements Runnable, TaskActions {
 				metrics.close();
 			}
 			catch (Throwable t) {
-				LOG.error("Error during metrics de-registration", t);
+				LOG.error("Error during metrics de-registration of task {} ({}).", taskNameWithSubtask, executionId, t);
 			}
 		}
 	}
@@ -845,6 +847,39 @@ public class Task implements Runnable, TaskActions {
 		taskManagerConnection.notifyFatalError(message, cause);
 	}
 
+	/**
+	 * Try to transition the execution state from the current state to the new state.
+	 *
+	 * @param currentState of the execution
+	 * @param newState of the execution
+	 * @return true if the transition was successful, otherwise false
+	 */
+	private boolean transitionState(ExecutionState currentState, ExecutionState newState) {
+		return transitionState(currentState, newState, null);
+	}
+
+	/**
+	 * Try to transition the execution state from the current state to the new state.
+	 *
+	 * @param currentState of the execution
+	 * @param newState of the execution
+	 * @param cause of the transition change or null
+	 * @return true if the transition was successful, otherwise false
+	 */
+	private boolean transitionState(ExecutionState currentState, ExecutionState newState, Throwable cause) {
+		if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
+			if (cause == null) {
+				LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState);
+			} else {
+				LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState, cause);
+			}
+
+			return true;
+		} else {
+			return false;
+		}
+	}
+
 	// ----------------------------------------------------------------------------------------------------------------
 	//  Stopping / Canceling / Failing the task from the outside
 	// ----------------------------------------------------------------------------------------------------------------
@@ -859,22 +894,22 @@ public class Task implements Runnable, TaskActions {
 	 *             if the {@link AbstractInvokable} does not implement {@link StoppableTask}
 	 */
 	public void stopExecution() throws UnsupportedOperationException {
-		LOG.info("Attempting to stop task " + taskNameWithSubtask);
-		if(this.invokable instanceof StoppableTask) {
+		LOG.info("Attempting to stop task {} ({}).", taskNameWithSubtask, executionId);
+		if (invokable instanceof StoppableTask) {
 			Runnable runnable = new Runnable() {
 				@Override
 				public void run() {
 					try {
-						((StoppableTask)Task.this.invokable).stop();
+						((StoppableTask)invokable).stop();
 					} catch(RuntimeException e) {
-						LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e);
+						LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, e);
 						taskManagerConnection.failTask(executionId, e);
 					}
 				}
 			};
-			executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask);
+			executeAsyncCallRunnable(runnable, String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId));
 		} else {
-			throw new UnsupportedOperationException("Stopping not supported by this task.");
+			throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", taskNameWithSubtask, executionId));
 		}
 	}
 
@@ -887,7 +922,7 @@ public class Task implements Runnable, TaskActions {
 	 * <p>This method never blocks.</p>
 	 */
 	public void cancelExecution() {
-		LOG.info("Attempting to cancel task " + taskNameWithSubtask);
+		LOG.info("Attempting to cancel task {} ({}).", taskNameWithSubtask, executionId);
 		cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
 	}
 
@@ -902,37 +937,52 @@ public class Task implements Runnable, TaskActions {
 	 */
 	@Override
 	public void failExternally(Throwable cause) {
-		LOG.info("Attempting to fail task externally " + taskNameWithSubtask);
+		LOG.info("Attempting to fail task externally {} ({}).", taskNameWithSubtask, executionId);
 		cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
 	}
 
 	private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
 		while (true) {
-			ExecutionState current = this.executionState;
+			ExecutionState current = executionState;
 
 			// if the task is already canceled (or canceling) or finished or failed,
 			// then we need not do anything
 			if (current.isTerminal() || current == ExecutionState.CANCELING) {
-				LOG.info("Task " + taskNameWithSubtask + " is already in state " + current);
+				LOG.info("Task {} is already in state {}", taskNameWithSubtask, current);
 				return;
 			}
 
 			if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
-				if (STATE_UPDATER.compareAndSet(this, current, targetState)) {
+				if (transitionState(current, targetState, cause)) {
 					// if we manage this state transition, then the invokable gets never called
 					// we need not call cancel on it
 					this.failureCause = cause;
-					notifyObservers(targetState, cause);
+					notifyObservers(
+						targetState,
+						new Exception(
+							String.format(
+								"Cancel or fail execution of %s (%s).",
+								taskNameWithSubtask,
+								executionId),
+							cause));
 					return;
 				}
 			}
 			else if (current == ExecutionState.RUNNING) {
-				if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) {
+				if (transitionState(ExecutionState.RUNNING, targetState, cause)) {
 					// we are canceling / failing out of the running state
 					// we need to cancel the invokable
 					if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
 						this.failureCause = cause;
-						notifyObservers(targetState, cause);
+						notifyObservers(
+							targetState,
+							new Exception(
+								String.format(
+									"Cancel or fail execution of %s (%s).",
+									taskNameWithSubtask,
+									executionId),
+								cause));
+
 						LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
 
 						// because the canceling may block on user code, we cancel from a separate thread
@@ -951,7 +1001,7 @@ public class Task implements Runnable, TaskActions {
 								producedPartitions,
 								inputGates);
 						Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
-								"Canceler for " + taskNameWithSubtask);
+								String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
 						cancelThread.setDaemon(true);
 						cancelThread.start();
 					}
@@ -959,7 +1009,8 @@ public class Task implements Runnable, TaskActions {
 				}
 			}
 			else {
-				throw new IllegalStateException("Unexpected task state: " + current);
+				throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
+					current, taskNameWithSubtask, executionId));
 			}
 		}
 	}
@@ -973,13 +1024,6 @@ public class Task implements Runnable, TaskActions {
 	}
 
 	private void notifyObservers(ExecutionState newState, Throwable error) {
-		if (error == null) {
-			LOG.info(taskNameWithSubtask + " switched to " + newState);
-		}
-		else {
-			LOG.info(taskNameWithSubtask + " switched to " + newState + " with exception.", error);
-		}
-
 		TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error);
 
 		for (TaskExecutionStateListener listener : taskExecutionStateListeners) {
@@ -1066,24 +1110,29 @@ public class Task implements Runnable, TaskActions {
 						catch (Throwable t) {
 							if (getExecutionState() == ExecutionState.RUNNING) {
 								failExternally(new Exception(
-									"Error while triggering checkpoint for " + taskName,
-									t));
+									"Error while triggering checkpoint " + checkpointID + " for " +
+										taskNameWithSubtask, t));
+							} else {
+								LOG.debug("Encountered error while triggering checkpoint {} for " +
+									"{} ({}) while being not in state running.", checkpointID,
+									taskNameWithSubtask, executionId, t);
 							}
 						}
 					}
 				};
-				executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName);
+				executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
 			}
 			else {
 				checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
 						new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
+				
+				LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
+						taskNameWithSubtask, executionId);
 
-				LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
-						+ taskNameWithSubtask);
 			}
 		}
 		else {
-			LOG.debug("Declining checkpoint request for non-running task");
+			LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
 
 			// send back a message that we did not do the checkpoint
 			checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
@@ -1120,12 +1169,12 @@ public class Task implements Runnable, TaskActions {
 				executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName);
 			}
 			else {
-				LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - "
-						+ taskNameWithSubtask);
+				LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.",
+						taskNameWithSubtask);
 			}
 		}
 		else {
-			LOG.debug("Ignoring checkpoint commit notification for non-running task.");
+			LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);
 		}
 	}
 
@@ -1228,14 +1277,14 @@ public class Task implements Runnable, TaskActions {
 				invokable.cancel();
 			}
 			catch (Throwable t) {
-				LOG.error("Error while canceling task " + taskNameWithSubtask, t);
+				LOG.error("Error while canceling task {}.", taskNameWithSubtask, t);
 			}
 		}
 	}
 
 	@Override
 	public String toString() {
-		return taskNameWithSubtask + " [" + executionState + ']';
+		return String.format("%s (%s) [%s]", taskNameWithSubtask, executionId, executionState);
 	}
 
 	/**
@@ -1312,7 +1361,7 @@ public class Task implements Runnable, TaskActions {
 				try {
 					invokable.cancel();
 				} catch (Throwable t) {
-					logger.error("Error while canceling the task", t);
+					logger.error("Error while canceling the task {}.", taskName, t);
 				}
 
 				// Early release of input and output buffer pools. We do this
@@ -1326,7 +1375,7 @@ public class Task implements Runnable, TaskActions {
 					try {
 						partition.destroyBufferPool();
 					} catch (Throwable t) {
-						LOG.error("Failed to release result partition buffer pool.", t);
+						LOG.error("Failed to release result partition buffer pool for task {}.", taskName, t);
 					}
 				}
 
@@ -1334,7 +1383,7 @@ public class Task implements Runnable, TaskActions {
 					try {
 						inputGate.releaseAllResources();
 					} catch (Throwable t) {
-						LOG.error("Failed to release input gate.", t);
+						LOG.error("Failed to release input gate for task {}.", taskName, t);
 					}
 				}
 
@@ -1352,7 +1401,7 @@ public class Task implements Runnable, TaskActions {
 					watchDogThread.join();
 				}
 			} catch (Throwable t) {
-				logger.error("Error in the task canceler", t);
+				logger.error("Error in the task canceler for task {}.", taskName, t);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
new file mode 100644
index 0000000..6ee0141
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
+
+	@Test
+	public void testPathConversion() {
+		final long checkpointId = 42L;
+
+		final String path = ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId);
+
+		assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 54f6c10..88a29ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -211,7 +211,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		boolean disposed = false;
 		try {
 			// -------- Initialize ---------
-			LOG.debug("Initializing {}", getName());
+			LOG.debug("Initializing {}.", getName());
 
 			asyncOperationsThreadPool = Executors.newCachedThreadPool();
 
@@ -528,8 +528,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		catch (Exception e) {
 			// propagate exceptions only if the task is still in "running" state
 			if (isRunning) {
-				throw e;
+				throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
+					"for operator " + getName() + '.', e);
 			} else {
+				LOG.debug("Could not perform checkpoint {} for operator {} while the " +
+					"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
 				return false;
 			}
 		}
@@ -541,10 +544,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			performCheckpoint(checkpointMetaData);
 		}
 		catch (CancelTaskException e) {
-			throw e;
+			throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " +
+				checkpointMetaData.getCheckpointId() + '.');
 		}
 		catch (Exception e) {
-			throw new Exception("Error while performing a checkpoint", e);
+			throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " +
+				getName() + '.', e);
 		}
 	}
 
@@ -678,7 +683,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		if (stateBackend != null) {
 			// backend has been configured on the environment
-			LOG.info("Using user-defined state backend: " + stateBackend);
+			LOG.info("Using user-defined state backend: {}.", stateBackend);
 		} else {
 			// see if we have a backend specified in the configuration
 			Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
@@ -697,8 +702,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 				case "filesystem":
 					FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
-					LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
-							+ backend.getBasePath() + "\")");
+					LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
+						backend.getBasePath());
 					stateBackend = backend;
 					break;
 
@@ -933,7 +938,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				}
 			} catch (Exception e) {
 				// registers the exception and tries to fail the whole task
-				AsynchronousException asyncException = new AsynchronousException(e);
+				AsynchronousException asyncException = new AsynchronousException(
+					new Exception(
+						"Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() +
+							" for operator " + owner.getName() + '.',
+						e));
 				owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
 			} finally {
 				owner.cancelables.unregisterClosable(this);