You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 17:08:27 UTC

flink git commit: [FLINK-5278] Improve task and checkpoint related logging

Repository: flink
Updated Branches:
  refs/heads/release-1.1 33df945fe -> b046038ae


[FLINK-5278] Improve task and checkpoint related logging

This closes #2690.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b046038a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b046038a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b046038a

Branch: refs/heads/release-1.1
Commit: b046038ae11f7662b6d788c1f005a9a61a45393b
Parents: 33df945
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 7 16:22:23 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 18:07:55 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  23 ++-
 .../ZooKeeperCompletedCheckpointStore.java      | 107 ++++++++++--
 .../flink/runtime/executiongraph/Execution.java |   9 +-
 .../apache/flink/runtime/taskmanager/Task.java  | 168 ++++++++++++-------
 .../src/main/resources/log4j.properties         |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  19 ++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  36 ++++
 .../runtime/taskmanager/TaskManagerTest.java    |   5 +-
 .../streaming/runtime/tasks/StreamTask.java     |  42 +++--
 9 files changed, 302 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index a3e511f..e5675c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -675,6 +675,7 @@ public class CheckpointCoordinator {
 		if (shutdown || message == null) {
 			return false;
 		}
+
 		if (!job.equals(message.getJob())) {
 			LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
 			return false;
@@ -710,7 +711,7 @@ public class CheckpointCoordinator {
 								"the state handle to avoid lingering state.", message.getCheckpointId(),
 							message.getTaskExecutionId(), message.getJob());
 
-						discardState(message.getState());
+						discardState(message.getJob(), message.getTaskExecutionId(), checkpointId, message.getState());
 						break;
 					case DISCARDED:
 						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
@@ -718,7 +719,7 @@ public class CheckpointCoordinator {
 								"state handle tp avoid lingering state.",
 							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
 
-						discardState(message.getState());
+						discardState(message.getJob(), message.getTaskExecutionId(), checkpointId, message.getState());
 				}
 
 				return true;
@@ -734,13 +735,15 @@ public class CheckpointCoordinator {
 				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
 				if (recentPendingCheckpoints.contains(checkpointId)) {
 					wasPendingCheckpoint = true;
-					LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
+					LOG.warn("Received late message for now expired checkpoint attempt {} from " +
+						"task {} and job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
 
 					// try to discard the state so that we don't have lingering state lying around
-					discardState(message.getState());
+					discardState(message.getJob(), message.getTaskExecutionId(), checkpointId, message.getState());
 				}
 				else {
-					LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
+					LOG.debug("Received message for an unknown checkpoint {} from task {} and job" +
+						" {}.", checkpointId, message.getTaskExecutionId(), message.getState());
 					wasPendingCheckpoint = false;
 				}
 
@@ -1112,7 +1115,11 @@ public class CheckpointCoordinator {
 		}
 	}
 
-	private void discardState(final SerializedValue<StateHandle<?>> stateObject) {
+	private void discardState(
+			final JobID jobId,
+			final ExecutionAttemptID executionAttemptID,
+			final long checkpointId,
+			final SerializedValue<StateHandle<?>> stateObject) {
 		if (stateObject != null) {
 			executor.execute(new Runnable() {
 				@Override
@@ -1120,7 +1127,9 @@ public class CheckpointCoordinator {
 					try {
 						stateObject.deserializeValue(userClassLoader).discardState();
 					} catch (Exception e) {
-						LOG.warn("Could not properly discard state object.", e);
+						LOG.warn("Could not properly discard state object for checkpoint {} " +
+							"belonging to task {} of job {}.", checkpointId,
+							executionAttemptID, jobId, e);
 					}
 				}
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 6570d00..9ae6c30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -167,7 +168,17 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
 					.get(numberOfInitialCheckpoints - 1);
 
-			CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);
+			CompletedCheckpoint latestCheckpoint;
+			long checkpointId = pathToCheckpointId(latest.f1);
+
+			LOG.info("Trying to retrieve checkpoint {}.", checkpointId);
+
+			try {
+				latestCheckpoint = latest.f0.getState(userClassLoader);
+			} catch (Exception e) {
+				throw new Exception("Could not retrieve the completed checkpoint " + checkpointId +
+				" from the state storage.", e);
+			}
 
 			checkpointStateHandles.add(latest);
 
@@ -194,7 +205,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		checkNotNull(checkpoint, "Checkpoint");
 
 		// First add the new one. If it fails, we don't want to loose existing data.
-		String path = String.format("/%s", checkpoint.getCheckpointID());
+		String path = checkpointIdToPath(checkpoint.getCheckpointID());
 
 		final StateHandle<CompletedCheckpoint> stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
 
@@ -266,26 +277,57 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	/**
 	 * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
 	 */
-	private void removeFromZooKeeperAndDiscardCheckpoint(
-			final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
+	private void removeFromZooKeeperAndDiscardCheckpoint(final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
 
 		final BackgroundCallback callback = new BackgroundCallback() {
 			@Override
 			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+				final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1);
+
 				try {
 					if (event.getType() == CuratorEventType.DELETE) {
 						if (event.getResultCode() == 0) {
-							// The checkpoint
-							CompletedCheckpoint checkpoint = stateHandleAndPath
-									.f0.getState(userClassLoader);
+							Exception exception = null;
 
-							checkpoint.discard(userClassLoader);
-
-							// Discard the state handle
-							stateHandleAndPath.f0.discardState();
-
-							// Discard the checkpoint
-							LOG.debug("Discarded " + checkpoint);
+							// The checkpoint
+							CompletedCheckpoint checkpoint = null;
+
+							try {
+								checkpoint = stateHandleAndPath.f0.getState(userClassLoader);
+							} catch (Exception e) {
+								Exception newException = new Exception("Could not retrieve the completed checkpoint " +
+									checkpointId + " from the state storage.", e);
+
+								exception = ExceptionUtils.firstOrSuppressed(newException, exception);
+							}
+
+							if (checkpoint != null) {
+								try {
+									checkpoint.discard(userClassLoader);
+								} catch (Exception e) {
+									Exception newException = new Exception("Could not discard the completed checkpoint " +
+										checkpoint + '.', e);
+
+									exception = ExceptionUtils.firstOrSuppressed(newException, exception);
+								}
+							}
+
+							try {
+								// Discard the state handle
+								stateHandleAndPath.f0.discardState();
+							} catch (Exception e) {
+								Exception newException = new Exception("Could not discard meta data of completed checkpoint " +
+									checkpointId + '.', e);
+
+								exception = ExceptionUtils.firstOrSuppressed(newException, exception);
+							}
+
+							if (exception != null) {
+								throw exception;
+							} else {
+								// Discard the checkpoint
+								LOG.debug("Discarded {}.", checkpoint);
+							}
 						}
 						else {
 							throw new IllegalStateException("Unexpected result code " +
@@ -298,7 +340,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 					}
 				}
 				catch (Exception e) {
-					LOG.error("Failed to discard checkpoint.", e);
+					LOG.warn("Failed to discard checkpoint {}.", checkpointId, e);
 				}
 			}
 		};
@@ -308,4 +350,39 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		// inconsistent state.
 		checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
 	}
+
+	/**
+	 * Convert a checkpoint id into a ZooKeeper path.
+	 *
+	 * @param checkpointId to convert to the path
+	 * @return Path created from the given checkpoint id
+	 */
+	protected static String checkpointIdToPath(long checkpointId) {
+		return String.format("/%s", checkpointId);
+	}
+
+	/**
+	 * Converts a path to the checkpoint id.
+	 *
+	 * @param path in ZooKeeper
+	 * @return Checkpoint id parsed from the path
+	 */
+	protected static long pathToCheckpointId(String path) {
+		try {
+			String numberString;
+
+			// check if we have a leading slash
+			if ('/' == path.charAt(0) ) {
+				numberString = path.substring(1);
+			} else {
+				numberString = path;
+			}
+			return Long.parseLong(numberString);
+		} catch (NumberFormatException e) {
+			LOG.warn("Could not parse checkpoint id from {}. This indicates that the " +
+				"checkpoint id to path conversion has changed.", path);
+
+			return -1L;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 84b679b..b336e54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -979,14 +979,17 @@ public class Execution implements Serializable {
 	private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
 		// sanity check
 		if (currentState.isTerminal()) {
-			throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + ".");
+			throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + '.');
 		}
 
 		if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
 			markTimestamp(targetState);
 
-			LOG.info(getVertex().getTaskNameWithSubtaskIndex() + " ("  + getAttemptId() + ") switched from "
-				+ currentState + " to " + targetState);
+			if (error == null) {
+				LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState);
+			} else {
+				LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState, error);
+			}
 
 			// make sure that the state transition completes normally.
 			// potential errors (in listeners may not affect the main logic)

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e4e1b36..8a446d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -480,7 +480,7 @@ public class Task implements Runnable {
 		while (true) {
 			ExecutionState current = this.executionState;
 			if (current == ExecutionState.CREATED) {
-				if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
+				if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
 					// success, we can start our work
 					break;
 				}
@@ -491,14 +491,14 @@ public class Task implements Runnable {
 				return;
 			}
 			else if (current == ExecutionState.CANCELING) {
-				if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {
+				if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
 					// we were immediately canceled. tell the TaskManager that we reached our final state
 					notifyFinalState();
 					return;
 				}
 			}
 			else {
-				throw new IllegalStateException("Invalid state for beginning of task operation");
+				throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
 			}
 		}
 
@@ -516,7 +516,7 @@ public class Task implements Runnable {
 
 			// first of all, get a user-code classloader
 			// this may involve downloading the job's JAR files and/or classes
-			LOG.info("Loading JAR files for task " + taskNameWithSubtask);
+			LOG.info("Loading JAR files for task {}.", this);
 
 			userCodeClassLoader = createUserCodeClassloader(libraryCache);
 			final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
@@ -545,7 +545,7 @@ public class Task implements Runnable {
 			// the registration must also strictly be undone
 			// ----------------------------------------------------------------
 
-			LOG.info("Registering task at network: " + this);
+			LOG.info("Registering task at network: {}.", this);
 			network.registerTask(this);
 
 			// next, kick off the background copying of files for the distributed cache
@@ -553,13 +553,15 @@ public class Task implements Runnable {
 				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
 						DistributedCache.readFileInfoFromConfig(jobConfiguration))
 				{
-					LOG.info("Obtaining local cache file for '" + entry.getKey() + '\'');
+					LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
 					Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
 					distributedCacheEntries.put(entry.getKey(), cp);
 				}
 			}
 			catch (Exception e) {
-				throw new Exception("Exception while adding files to distributed cache.", e);
+				throw new Exception(
+					String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
+					e);
 			}
 
 			if (isCanceledOrFailed()) {
@@ -598,11 +600,13 @@ public class Task implements Runnable {
 						StateUtils.setOperatorState(op, state);
 					}
 					catch (Exception e) {
-						throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);
+						throw new RuntimeException(
+							String.format("Failed to deserialize state handle and setup initial operator state for task %s (%s).", taskNameWithSubtask, executionId),
+							e);
 					}
 				}
 				else {
-					throw new IllegalStateException("Found operator state for a non-stateful task invokable");
+					throw new IllegalStateException(String.format("Found operator state for a non-stateful task %s (%s)", taskNameWithSubtask, executionId));
 				}
 			}
 
@@ -621,7 +625,7 @@ public class Task implements Runnable {
 			this.invokable = invokable;
 
 			// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
-			if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
+			if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
 				throw new CancelTaskException();
 			}
 
@@ -656,7 +660,7 @@ public class Task implements Runnable {
 
 			// try to mark the task as finished
 			// if that fails, the task was canceled/failed in the meantime
-			if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
+			if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
 				notifyObservers(ExecutionState.FINISHED, null);
 			}
 			else {
@@ -679,7 +683,7 @@ public class Task implements Runnable {
 
 					if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
 						if (t instanceof CancelTaskException) {
-							if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+							if (transitionState(current, ExecutionState.CANCELED)) {
 								cancelInvokable();
 
 								notifyObservers(ExecutionState.CANCELED, null);
@@ -687,19 +691,19 @@ public class Task implements Runnable {
 							}
 						}
 						else {
-							if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+							if (transitionState(current, ExecutionState.FAILED, t)) {
 								// proper failure of the task. record the exception as the root cause
-								LOG.error("Task execution failed. ", t);
+								String errorMessage = String.format("Execution of {} ({}) failed.", taskNameWithSubtask, executionId);
 								failureCause = t;
 								cancelInvokable();
 
-								notifyObservers(ExecutionState.FAILED, t);
+								notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
 								break;
 							}
 						}
 					}
 					else if (current == ExecutionState.CANCELING) {
-						if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+						if (transitionState(current, ExecutionState.CANCELED)) {
 							notifyObservers(ExecutionState.CANCELED, null);
 							break;
 						}
@@ -709,22 +713,22 @@ public class Task implements Runnable {
 						break;
 					}
 					// unexpected state, go to failed
-					else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
-						LOG.error("Unexpected state in Task during an exception: " + current);
+					else if (transitionState(current, ExecutionState.FAILED, t)) {
+						LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current);
 						break;
 					}
 					// else fall through the loop and
 				}
 			}
 			catch (Throwable tt) {
-				String message = "FATAL - exception in task exception handler";
+				String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId);
 				LOG.error(message, tt);
 				notifyFatalError(message, tt);
 			}
 		}
 		finally {
 			try {
-				LOG.info("Freeing task resources for " + taskNameWithSubtask);
+				LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId);
 
 				// stop the async dispatcher.
 				// copy dispatcher reference to stack, against concurrent release
@@ -751,7 +755,7 @@ public class Task implements Runnable {
 			}
 			catch (Throwable t) {
 				// an error in the resource cleanup is fatal
-				String message = "FATAL - exception in task resource cleanup";
+				String message = String.format("FATAL - exception in resource cleanup of task %s (%s).", taskNameWithSubtask, executionId);
 				LOG.error(message, t);
 				notifyFatalError(message, t);
 			}
@@ -763,7 +767,7 @@ public class Task implements Runnable {
 				metrics.close();
 			}
 			catch (Throwable t) {
-				LOG.error("Error during metrics de-registration", t);
+				LOG.error("Error during metrics de-registration of task {} ({}).", taskNameWithSubtask, executionId, t);
 			}
 		}
 	}
@@ -829,6 +833,39 @@ public class Task implements Runnable {
 		taskManager.tell(new FatalError(message, cause));
 	}
 
+	/**
+	 * Try to transition the execution state from the current state to the new state.
+	 *
+	 * @param currentState of the execution
+	 * @param newState of the execution
+	 * @return true if the transition was successful, otherwise false
+	 */
+	private boolean transitionState(ExecutionState currentState, ExecutionState newState) {
+		return transitionState(currentState, newState, null);
+	}
+
+	/**
+	 * Try to transition the execution state from the current state to the new state.
+	 *
+	 * @param currentState of the execution
+	 * @param newState of the execution
+	 * @param cause of the transition change or null
+	 * @return true if the transition was successful, otherwise false
+	 */
+	private boolean transitionState(ExecutionState currentState, ExecutionState newState, Throwable cause) {
+		if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
+			if (cause == null) {
+				LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState);
+			} else {
+				LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState, cause);
+			}
+
+			return true;
+		} else {
+			return false;
+		}
+	}
+
 	// ----------------------------------------------------------------------------------------------------------------
 	//  Stopping / Canceling / Failing the task from the outside
 	// ----------------------------------------------------------------------------------------------------------------
@@ -843,22 +880,22 @@ public class Task implements Runnable {
 	 *             if the {@link AbstractInvokable} does not implement {@link StoppableTask}
 	 */
 	public void stopExecution() throws UnsupportedOperationException {
-		LOG.info("Attempting to stop task " + taskNameWithSubtask);
-		if(this.invokable instanceof StoppableTask) {
+		LOG.info("Attempting to stop task {} ({}).", taskNameWithSubtask, executionId);
+		if (invokable instanceof StoppableTask) {
 			Runnable runnable = new Runnable() {
 				@Override
 				public void run() {
 					try {
-						((StoppableTask)Task.this.invokable).stop();
+						((StoppableTask)invokable).stop();
 					} catch(RuntimeException e) {
-						LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e);
+						LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, e);
 						taskManager.tell(new FailTask(executionId, e));
 					}
 				}
 			};
-			executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask);
+			executeAsyncCallRunnable(runnable, String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId));
 		} else {
-			throw new UnsupportedOperationException("Stopping not supported by this task.");
+			throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", taskNameWithSubtask, executionId));
 		}
 	}
 
@@ -871,7 +908,7 @@ public class Task implements Runnable {
 	 * <p>This method never blocks.</p>
 	 */
 	public void cancelExecution() {
-		LOG.info("Attempting to cancel task " + taskNameWithSubtask);
+		LOG.info("Attempting to cancel task {} ({}).", taskNameWithSubtask, executionId);
 		cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
 	}
 
@@ -885,37 +922,52 @@ public class Task implements Runnable {
 	 * <p>This method never blocks.</p>
 	 */
 	public void failExternally(Throwable cause) {
-		LOG.info("Attempting to fail task externally " + taskNameWithSubtask);
+		LOG.info("Attempting to fail task externally {} ({}).", taskNameWithSubtask, executionId);
 		cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
 	}
 
 	private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
 		while (true) {
-			ExecutionState current = this.executionState;
+			ExecutionState current = executionState;
 
 			// if the task is already canceled (or canceling) or finished or failed,
 			// then we need not do anything
 			if (current.isTerminal() || current == ExecutionState.CANCELING) {
-				LOG.info("Task " + taskNameWithSubtask + " is already in state " + current);
+				LOG.info("Task {} is already in state {}", taskNameWithSubtask, current);
 				return;
 			}
 
 			if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
-				if (STATE_UPDATER.compareAndSet(this, current, targetState)) {
+				if (transitionState(current, targetState, cause)) {
 					// if we manage this state transition, then the invokable gets never called
 					// we need not call cancel on it
 					this.failureCause = cause;
-					notifyObservers(targetState, cause);
+					notifyObservers(
+						targetState,
+						new Exception(
+							String.format(
+								"Cancel or fail execution of %s (%s).",
+								taskNameWithSubtask,
+								executionId),
+							cause));
 					return;
 				}
 			}
 			else if (current == ExecutionState.RUNNING) {
-				if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) {
+				if (transitionState(ExecutionState.RUNNING, targetState, cause)) {
 					// we are canceling / failing out of the running state
 					// we need to cancel the invokable
 					if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
 						this.failureCause = cause;
-						notifyObservers(targetState, cause);
+						notifyObservers(
+							targetState,
+							new Exception(
+								String.format(
+									"Cancel or fail execution of %s (%s).",
+									taskNameWithSubtask,
+									executionId),
+								cause));
+
 						LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
 
 						// because the canceling may block on user code, we cancel from a separate thread
@@ -934,7 +986,7 @@ public class Task implements Runnable {
 								producedPartitions,
 								inputGates);
 						Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
-								"Canceler for " + taskNameWithSubtask);
+								String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
 						cancelThread.setDaemon(true);
 						cancelThread.start();
 					}
@@ -942,7 +994,8 @@ public class Task implements Runnable {
 				}
 			}
 			else {
-				throw new IllegalStateException("Unexpected task state: " + current);
+				throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
+					current, taskNameWithSubtask, executionId));
 			}
 		}
 	}
@@ -956,13 +1009,6 @@ public class Task implements Runnable {
 	}
 
 	private void notifyObservers(ExecutionState newState, Throwable error) {
-		if (error == null) {
-			LOG.info(taskNameWithSubtask + " switched to " + newState);
-		}
-		else {
-			LOG.info(taskNameWithSubtask + " switched to " + newState + " with exception.", error);
-		}
-
 		TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error);
 		UpdateTaskExecutionState actorMessage = new UpdateTaskExecutionState(stateUpdate);
 
@@ -1009,16 +1055,20 @@ public class Task implements Runnable {
 							if (getExecutionState() == ExecutionState.RUNNING) {
 								failExternally(new Exception(
 									"Error while triggering checkpoint " + checkpointID + " for " +
-										taskName, t));
+										taskNameWithSubtask, t));
+							} else {
+								LOG.debug("Encountered error while triggering checkpoint {} for " +
+									"{} ({}) while being not in state running.", checkpointID,
+									taskNameWithSubtask, executionId, t);
 							}
 						}
 					}
 				};
-				executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName);
+				executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
 			}
 			else {
-				LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
-						+ taskNameWithSubtask);
+				LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
+						taskNameWithSubtask, executionId);
 
 				DeclineCheckpoint decline = new DeclineCheckpoint(
 						jobId, executionId, checkpointID,
@@ -1027,7 +1077,7 @@ public class Task implements Runnable {
 			}
 		}
 		else {
-			LOG.debug("Declining checkpoint request for non-running task");
+			LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
 
 			// send back a message that we did not do the checkpoint
 			DeclineCheckpoint decline = new DeclineCheckpoint(
@@ -1066,12 +1116,12 @@ public class Task implements Runnable {
 				executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName);
 			}
 			else {
-				LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - "
-						+ taskNameWithSubtask);
+				LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.",
+						taskNameWithSubtask);
 			}
 		}
 		else {
-			LOG.debug("Ignoring checkpoint commit notification for non-running task.");
+			LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);
 		}
 	}
 
@@ -1174,14 +1224,14 @@ public class Task implements Runnable {
 				invokable.cancel();
 			}
 			catch (Throwable t) {
-				LOG.error("Error while canceling task " + taskNameWithSubtask, t);
+				LOG.error("Error while canceling task {}.", taskNameWithSubtask, t);
 			}
 		}
 	}
 
 	@Override
 	public String toString() {
-		return taskNameWithSubtask + " [" + executionState + ']';
+		return String.format("%s (%s) [%s]", taskNameWithSubtask, executionId, executionState);
 	}
 
 	/**
@@ -1257,7 +1307,7 @@ public class Task implements Runnable {
 				try {
 					invokable.cancel();
 				} catch (Throwable t) {
-					logger.error("Error while canceling the task", t);
+					logger.error("Error while canceling the task {}.", taskName, t);
 				}
 
 				// Early release of input and output buffer pools. We do this
@@ -1271,7 +1321,7 @@ public class Task implements Runnable {
 					try {
 						partition.destroyBufferPool();
 					} catch (Throwable t) {
-						LOG.error("Failed to release result partition buffer pool.", t);
+						LOG.error("Failed to release result partition buffer pool for task {}.", taskName, t);
 					}
 				}
 
@@ -1279,7 +1329,7 @@ public class Task implements Runnable {
 					try {
 						inputGate.releaseAllResources();
 					} catch (Throwable t) {
-						LOG.error("Failed to release input gate.", t);
+						LOG.error("Failed to release input gate for task {}.", taskName, t);
 					}
 				}
 
@@ -1297,7 +1347,7 @@ public class Task implements Runnable {
 					watchDogThread.join();
 				}
 			} catch (Throwable t) {
-				logger.error("Error in the task canceler", t);
+				logger.error("Error in the task canceler for task {}.", taskName, t);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/log4j.properties b/flink-runtime/src/main/resources/log4j.properties
index 9912b19..749796f 100644
--- a/flink-runtime/src/main/resources/log4j.properties
+++ b/flink-runtime/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
 
 
 # Convenience file for local debugging of the JobManager/TaskManager.
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cbf7b5d..2b455b7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -82,7 +82,6 @@ import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
-import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 
 /**
@@ -1382,18 +1381,20 @@ class JobManager(
                       case None => getClass.getClassLoader
                     }
 
-                    future {
-                      Option(ackMessage.getState()) match {
-                        case Some(state) =>
+                    Option(ackMessage.getState()) match {
+                      case Some(state) =>
+                        future {
                           try {
                             state.deserializeValue(classLoader).discardState()
                           } catch {
-                            case e: Exception => log.warn("Could not discard orphaned checkpoint " +
-                                             "state.", e)
+                            case e: Exception =>
+                              log.warn("Could not discard orphaned checkpoint state for " +
+                                         s"$ackMessage.", e)
                           }
-                        case None =>
-                      }
-                    }(ExecutionContext.fromExecutor(ioExecutor))
+                        } (ExecutionContext.fromExecutor(ioExecutor))
+                      case None =>
+                        // no state to discard
+                    }
                   }
                 } catch {
                   case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
new file mode 100644
index 0000000..6ee0141
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
+
+	@Test
+	public void testPathConversion() {
+		final long checkpointId = 42L;
+
+		final String path = ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId);
+
+		assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index f2fd859..6a696a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -103,6 +103,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -480,7 +481,9 @@ public class TaskManagerTest extends TestLogger {
 									"found."));
 
 							tm.tell(new StopTask(eid2), testActorGateway);
-							expectMsgEquals(new TaskOperationResult(eid2, false, "UnsupportedOperationException: Stopping not supported by this task."));
+							TaskOperationResult message = expectMsgClass(TaskOperationResult.class);
+							assertEquals(eid2, message.executionID());
+							assertFalse(message.success());
 
 							assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d7204a9..aa88175 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -202,7 +202,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		boolean disposed = false;
 		try {
 			// -------- Initialize ---------
-			LOG.debug("Initializing {}", getName());
+			LOG.debug("Initializing {}.", getName());
 
 			userClassLoader = getUserCodeClassLoader();
 			configuration = new StreamConfig(getTaskConfiguration());
@@ -587,8 +587,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		catch (Exception e) {
 			// propagate exceptions only if the task is still in "running" state
 			if (isRunning) {
-				throw e;
+				throw new Exception("Could not perform checkpoint " + checkpointId +
+					"for operator " + getName() + '.', e);
 			} else {
+				LOG.debug("Could not perform checkpoint {} for operator {} while the " +
+					"invokable was not in state running.", checkpointId, getName(), e);
 				return false;
 			}
 		}
@@ -600,10 +603,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			performCheckpoint(checkpointId, timestamp);
 		}
 		catch (CancelTaskException e) {
-			throw e;
+			throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " +
+				checkpointId + '.');
 		}
 		catch (Exception e) {
-			throw new Exception("Error while performing checkpoint " + checkpointId + '.', e);
+			throw new Exception("Could not perform checkpoint " + checkpointId + " for operator " +
+				getName() + '.', e);
 		}
 	}
 
@@ -651,13 +656,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 									try {
 										states[j].discardState();
 									} catch (Exception discardException) {
-										LOG.warn("Could not discard " + j + "th operator state.", discardException);
+										LOG.warn("Could not discard {}th operator state of " +
+											"checkpoint {} for operator {}.", j, checkpointId,
+											getName(), discardException);
 									}
 								}
 							}
 
-							throw new Exception("Could not perform the checkpoint for " + i +
-								"th operator in chain.", exception);
+							throw new Exception("Could not perform the checkpoint " + checkpointId +
+								" for " + i + "th operator in chain.", exception);
 						}
 
 						if (state.getOperatorState() instanceof AsynchronousStateHandle) {
@@ -768,7 +775,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 		if (stateBackend != null) {
 			// backend has been configured on the environment
-			LOG.info("Using user-defined state backend: " + stateBackend);
+			LOG.info("Using user-defined state backend: {}.", stateBackend);
 		} else {
 			// see if we have a backend specified in the configuration
 			Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
@@ -787,8 +794,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 				case "filesystem":
 					FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
-					LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
-						+ backend.getBasePath() + "\")");
+					LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
+						backend.getBasePath());
 					stateBackend = backend;
 					break;
 
@@ -945,11 +952,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 								try {
 									states[j].discardState();
 								} catch (Exception discardException) {
-									LOG.warn("Could not discard the " + j + "th operator state.", discardException);
+									LOG.warn("Could not discard the {}th operator state of " +
+										"checkpoint {} for operator {}.", j, checkpointId,
+										owner.getName(), discardException);
 								}
 							}
 
-							throw new Exception("Could not materialize the " + i + "th operator state.", exception);
+							throw new Exception("Could not materialize the " + i + "th operator " +
+								"state of operator " + owner.getName() + " for checkpoint " +
+								checkpointId + '.', exception);
 						}
 					}
 				}
@@ -962,10 +973,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			}
 			catch (Exception e) {
 				if (owner.isRunning()) {
-					LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
+					LOG.error("Caught exception while materializing asynchronous checkpoint {} for operator {}.", checkpointId, owner.getName(), e);
 				}
+
 				if (owner.asyncException == null) {
-					owner.asyncException = new AsynchronousException(e);
+					owner.asyncException = new AsynchronousException(
+						new Exception("Could not materialize checkpoint " + checkpointId +
+							" of operator " + getName() + '.', e));
 				}
 			}
 			finally {