You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/09 16:46:50 UTC
[2/4] flink git commit: [FLINK-5278] Improve task and checkpoint
related logging
[FLINK-5278] Improve task and checkpoint related logging
Add more logging
This closes #2959.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea708071
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea708071
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea708071
Branch: refs/heads/master
Commit: ea7080712f2dcbdf125b806007c80aa3d120f30a
Parents: d3f19a5
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 7 16:22:23 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 9 14:42:13 2016 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 32 ++--
.../runtime/checkpoint/CompletedCheckpoint.java | 3 +-
.../ZooKeeperCompletedCheckpointStore.java | 75 ++++++++-
.../flink/runtime/executiongraph/Execution.java | 9 +-
.../apache/flink/runtime/taskmanager/Task.java | 165 ++++++++++++-------
.../ZooKeeperCompletedCheckpointStoreTest.java | 36 ++++
.../streaming/runtime/tasks/StreamTask.java | 25 ++-
7 files changed, 260 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8ca4b2e..5f0fd74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -613,6 +613,7 @@ public class CheckpointCoordinator {
if (shutdown || message == null) {
return false;
}
+
if (!job.equals(message.getJob())) {
LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message);
return false;
@@ -641,6 +642,9 @@ public class CheckpointCoordinator {
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
case SUCCESS:
+ LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
+ checkpointId, message.getTaskExecutionId(), message.getJob());
+
if (checkpoint.isFullyAcknowledged()) {
// record the time when this was completed, to calculate
@@ -651,8 +655,8 @@ public class CheckpointCoordinator {
completed = checkpoint.finalizeCheckpoint();
completedCheckpointStore.addCheckpoint(completed);
- LOG.info("Completed checkpoint " + checkpointId + " (in " +
- completed.getDuration() + " ms)");
+ LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+ completed.getStateSize(), completed.getDuration());
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
@@ -685,7 +689,7 @@ public class CheckpointCoordinator {
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob());
- discardState(message.getSubtaskState());
+ discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
break;
case DISCARDED:
@@ -694,7 +698,7 @@ public class CheckpointCoordinator {
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
- discardState(message.getSubtaskState());
+ discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}
}
else if (checkpoint != null) {
@@ -706,15 +710,17 @@ public class CheckpointCoordinator {
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
isPendingCheckpoint = true;
- LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
+ LOG.warn("Received late message for now expired checkpoint attempt {} from " +
+ "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
}
else {
- LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
+ LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
+ checkpointId, message.getTaskExecutionId(), message.getJob());
isPendingCheckpoint = false;
}
// try to discard the state so that we don't have lingering state lying around
- discardState(message.getSubtaskState());
+ discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}
}
@@ -947,19 +953,25 @@ public class CheckpointCoordinator {
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
- LOG.error("Exception while triggering checkpoint", e);
+ LOG.error("Exception while triggering checkpoint.", e);
}
}
}
- private void discardState(final StateObject stateObject) {
+ private void discardState(
+ final JobID jobId,
+ final ExecutionAttemptID executionAttemptID,
+ final long checkpointId,
+ final StateObject stateObject) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
stateObject.discardState();
} catch (Exception e) {
- LOG.warn("Could not properly discard state object.", e);
+ LOG.warn("Could not properly discard state object of checkpoint {} " +
+ "belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId,
+ e);
}
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 3c33ce3..ed65011 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.StateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
@@ -164,7 +163,7 @@ public class CompletedCheckpoint implements Serializable {
}
}
- public long getStateSize() throws IOException {
+ public long getStateSize() {
long result = 0L;
for (TaskState taskState : taskStates.values()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 4add504..fdd0d40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -163,7 +163,17 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
.get(numberOfInitialCheckpoints - 1);
- CompletedCheckpoint latestCheckpoint = latest.f0.retrieveState();
+ CompletedCheckpoint latestCheckpoint;
+ long checkpointId = pathToCheckpointId(latest.f1);
+
+ LOG.info("Trying to retrieve checkpoint {}.", checkpointId);
+
+ try {
+ latestCheckpoint = latest.f0.retrieveState();
+ } catch (Exception e) {
+ throw new Exception("Could not retrieve the completed checkpoint " + checkpointId +
+ " from the state storage.", e);
+ }
checkpointStateHandles.add(latest);
@@ -190,7 +200,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
checkNotNull(checkpoint, "Checkpoint");
// First add the new one. If it fails, we don't want to loose existing data.
- String path = String.format("/%s", checkpoint.getCheckpointID());
+ String path = checkpointIdToPath(checkpoint.getCheckpointID());
final RetrievableStateHandle<CompletedCheckpoint> stateHandle =
checkpointsInZooKeeper.add(path, checkpoint);
@@ -298,14 +308,36 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1);
+
try {
if (event.getType() == CuratorEventType.DELETE) {
if (event.getResultCode() == 0) {
+ Exception exception = null;
+
try {
action.call();
- } finally {
+ } catch (Exception e) {
+ exception = new Exception("Could not execute callable action " +
+ "for checkpoint " + checkpointId + '.', e);
+ }
+
+ try {
// Discard the state handle
stateHandleAndPath.f0.discardState();
+ } catch (Exception e) {
+ Exception newException = new Exception("Could not discard meta " +
+ "data for completed checkpoint " + checkpointId + '.', e);
+
+ if (exception == null) {
+ exception = newException;
+ } else {
+ exception.addSuppressed(newException);
+ }
+ }
+
+ if (exception != null) {
+ throw exception;
}
} else {
throw new IllegalStateException("Unexpected result code " +
@@ -316,7 +348,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
event.getType() + " in '" + event + "' callback.");
}
} catch (Exception e) {
- LOG.error("Failed to discard checkpoint.", e);
+ LOG.warn("Failed to discard checkpoint {}.", checkpointId, e);
}
}
};
@@ -326,4 +358,39 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
// inconsistent state.
checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
}
+
+ /**
+ * Convert a checkpoint id into a ZooKeeper path.
+ *
+ * @param checkpointId to convert to the path
+ * @return Path created from the given checkpoint id
+ */
+ protected static String checkpointIdToPath(long checkpointId) {
+ return String.format("/%s", checkpointId);
+ }
+
+ /**
+ * Converts a path to the checkpoint id.
+ *
+ * @param path in ZooKeeper
+ * @return Checkpoint id parsed from the path
+ */
+ protected static long pathToCheckpointId(String path) {
+ try {
+ String numberString;
+
+ // check if we have a leading slash
+ if ('/' == path.charAt(0) ) {
+ numberString = path.substring(1);
+ } else {
+ numberString = path;
+ }
+ return Long.parseLong(numberString);
+ } catch (NumberFormatException e) {
+ LOG.warn("Could not parse checkpoint id from {}. This indicates that the " +
+ "checkpoint id to path conversion has changed.", path);
+
+ return -1L;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 219d71d..16aebce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -1015,14 +1015,17 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
// sanity check
if (currentState.isTerminal()) {
- throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + ".");
+ throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + '.');
}
if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
markTimestamp(targetState);
- LOG.info(getVertex().getTaskNameWithSubtaskIndex() + " (" + getAttemptId() + ") switched from "
- + currentState + " to " + targetState);
+ if (error == null) {
+ LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState);
+ } else {
+ LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState, error);
+ }
// make sure that the state transition completes normally.
// potential errors (in listeners may not affect the main logic)
http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 14ef1bf..184c3b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -504,7 +504,7 @@ public class Task implements Runnable, TaskActions {
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.CREATED) {
- if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
+ if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
// success, we can start our work
break;
}
@@ -515,14 +515,14 @@ public class Task implements Runnable, TaskActions {
return;
}
else if (current == ExecutionState.CANCELING) {
- if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {
+ if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
// we were immediately canceled. tell the TaskManager that we reached our final state
notifyFinalState();
return;
}
}
else {
- throw new IllegalStateException("Invalid state for beginning of task operation");
+ throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
}
}
@@ -543,7 +543,7 @@ public class Task implements Runnable, TaskActions {
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
- LOG.info("Loading JAR files for task " + taskNameWithSubtask);
+ LOG.info("Loading JAR files for task {}.", this);
userCodeClassLoader = createUserCodeClassloader(libraryCache);
final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
@@ -572,7 +572,7 @@ public class Task implements Runnable, TaskActions {
// the registration must also strictly be undone
// ----------------------------------------------------------------
- LOG.info("Registering task at network: " + this);
+ LOG.info("Registering task at network: {}.", this);
network.registerTask(this);
@@ -581,13 +581,15 @@ public class Task implements Runnable, TaskActions {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
DistributedCache.readFileInfoFromConfig(jobConfiguration))
{
- LOG.info("Obtaining local cache file for '" + entry.getKey() + '\'');
+ LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
distributedCacheEntries.put(entry.getKey(), cp);
}
}
catch (Exception e) {
- throw new Exception("Exception while adding files to distributed cache.", e);
+ throw new Exception(
+ String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
+ e);
}
if (isCanceledOrFailed()) {
@@ -638,7 +640,7 @@ public class Task implements Runnable, TaskActions {
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
- if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
+ if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
@@ -671,7 +673,7 @@ public class Task implements Runnable, TaskActions {
// try to mark the task as finished
// if that fails, the task was canceled/failed in the meantime
- if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
+ if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
notifyObservers(ExecutionState.FINISHED, null);
}
else {
@@ -694,7 +696,7 @@ public class Task implements Runnable, TaskActions {
if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
if (t instanceof CancelTaskException) {
- if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+ if (transitionState(current, ExecutionState.CANCELED)) {
cancelInvokable();
notifyObservers(ExecutionState.CANCELED, null);
@@ -702,19 +704,19 @@ public class Task implements Runnable, TaskActions {
}
}
else {
- if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+ if (transitionState(current, ExecutionState.FAILED, t)) {
// proper failure of the task. record the exception as the root cause
- LOG.error("Task execution failed. ", t);
+ String errorMessage = String.format("Execution of {} ({}) failed.", taskNameWithSubtask, executionId);
failureCause = t;
cancelInvokable();
- notifyObservers(ExecutionState.FAILED, t);
+ notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
break;
}
}
}
else if (current == ExecutionState.CANCELING) {
- if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+ if (transitionState(current, ExecutionState.CANCELED)) {
notifyObservers(ExecutionState.CANCELED, null);
break;
}
@@ -724,22 +726,22 @@ public class Task implements Runnable, TaskActions {
break;
}
// unexpected state, go to failed
- else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
- LOG.error("Unexpected state in Task during an exception: " + current);
+ else if (transitionState(current, ExecutionState.FAILED, t)) {
+ LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current);
break;
}
// else fall through the loop and
}
}
catch (Throwable tt) {
- String message = "FATAL - exception in task exception handler";
+ String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId);
LOG.error(message, tt);
notifyFatalError(message, tt);
}
}
finally {
try {
- LOG.info("Freeing task resources for " + taskNameWithSubtask);
+ LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId);
// stop the async dispatcher.
// copy dispatcher reference to stack, against concurrent release
@@ -767,7 +769,7 @@ public class Task implements Runnable, TaskActions {
}
catch (Throwable t) {
// an error in the resource cleanup is fatal
- String message = "FATAL - exception in task resource cleanup";
+ String message = String.format("FATAL - exception in resource cleanup of task %s (%s).", taskNameWithSubtask, executionId);
LOG.error(message, t);
notifyFatalError(message, t);
}
@@ -779,7 +781,7 @@ public class Task implements Runnable, TaskActions {
metrics.close();
}
catch (Throwable t) {
- LOG.error("Error during metrics de-registration", t);
+ LOG.error("Error during metrics de-registration of task {} ({}).", taskNameWithSubtask, executionId, t);
}
}
}
@@ -845,6 +847,39 @@ public class Task implements Runnable, TaskActions {
taskManagerConnection.notifyFatalError(message, cause);
}
+ /**
+ * Try to transition the execution state from the current state to the new state.
+ *
+ * @param currentState of the execution
+ * @param newState of the execution
+ * @return true if the transition was successful, otherwise false
+ */
+ private boolean transitionState(ExecutionState currentState, ExecutionState newState) {
+ return transitionState(currentState, newState, null);
+ }
+
+ /**
+ * Try to transition the execution state from the current state to the new state.
+ *
+ * @param currentState of the execution
+ * @param newState of the execution
+ * @param cause of the transition change or null
+ * @return true if the transition was successful, otherwise false
+ */
+ private boolean transitionState(ExecutionState currentState, ExecutionState newState, Throwable cause) {
+ if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
+ if (cause == null) {
+ LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState);
+ } else {
+ LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState, cause);
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
// ----------------------------------------------------------------------------------------------------------------
// Stopping / Canceling / Failing the task from the outside
// ----------------------------------------------------------------------------------------------------------------
@@ -859,22 +894,22 @@ public class Task implements Runnable, TaskActions {
* if the {@link AbstractInvokable} does not implement {@link StoppableTask}
*/
public void stopExecution() throws UnsupportedOperationException {
- LOG.info("Attempting to stop task " + taskNameWithSubtask);
- if(this.invokable instanceof StoppableTask) {
+ LOG.info("Attempting to stop task {} ({}).", taskNameWithSubtask, executionId);
+ if (invokable instanceof StoppableTask) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
- ((StoppableTask)Task.this.invokable).stop();
+ ((StoppableTask)invokable).stop();
} catch(RuntimeException e) {
- LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e);
+ LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, e);
taskManagerConnection.failTask(executionId, e);
}
}
};
- executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask);
+ executeAsyncCallRunnable(runnable, String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId));
} else {
- throw new UnsupportedOperationException("Stopping not supported by this task.");
+ throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", taskNameWithSubtask, executionId));
}
}
@@ -887,7 +922,7 @@ public class Task implements Runnable, TaskActions {
* <p>This method never blocks.</p>
*/
public void cancelExecution() {
- LOG.info("Attempting to cancel task " + taskNameWithSubtask);
+ LOG.info("Attempting to cancel task {} ({}).", taskNameWithSubtask, executionId);
cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
}
@@ -902,37 +937,52 @@ public class Task implements Runnable, TaskActions {
*/
@Override
public void failExternally(Throwable cause) {
- LOG.info("Attempting to fail task externally " + taskNameWithSubtask);
+ LOG.info("Attempting to fail task externally {} ({}).", taskNameWithSubtask, executionId);
cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
}
private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
while (true) {
- ExecutionState current = this.executionState;
+ ExecutionState current = executionState;
// if the task is already canceled (or canceling) or finished or failed,
// then we need not do anything
if (current.isTerminal() || current == ExecutionState.CANCELING) {
- LOG.info("Task " + taskNameWithSubtask + " is already in state " + current);
+ LOG.info("Task {} is already in state {}", taskNameWithSubtask, current);
return;
}
if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
- if (STATE_UPDATER.compareAndSet(this, current, targetState)) {
+ if (transitionState(current, targetState, cause)) {
// if we manage this state transition, then the invokable gets never called
// we need not call cancel on it
this.failureCause = cause;
- notifyObservers(targetState, cause);
+ notifyObservers(
+ targetState,
+ new Exception(
+ String.format(
+ "Cancel or fail execution of %s (%s).",
+ taskNameWithSubtask,
+ executionId),
+ cause));
return;
}
}
else if (current == ExecutionState.RUNNING) {
- if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) {
+ if (transitionState(ExecutionState.RUNNING, targetState, cause)) {
// we are canceling / failing out of the running state
// we need to cancel the invokable
if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
this.failureCause = cause;
- notifyObservers(targetState, cause);
+ notifyObservers(
+ targetState,
+ new Exception(
+ String.format(
+ "Cancel or fail execution of %s (%s).",
+ taskNameWithSubtask,
+ executionId),
+ cause));
+
LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
// because the canceling may block on user code, we cancel from a separate thread
@@ -951,7 +1001,7 @@ public class Task implements Runnable, TaskActions {
producedPartitions,
inputGates);
Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
- "Canceler for " + taskNameWithSubtask);
+ String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
cancelThread.setDaemon(true);
cancelThread.start();
}
@@ -959,7 +1009,8 @@ public class Task implements Runnable, TaskActions {
}
}
else {
- throw new IllegalStateException("Unexpected task state: " + current);
+ throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
+ current, taskNameWithSubtask, executionId));
}
}
}
@@ -973,13 +1024,6 @@ public class Task implements Runnable, TaskActions {
}
private void notifyObservers(ExecutionState newState, Throwable error) {
- if (error == null) {
- LOG.info(taskNameWithSubtask + " switched to " + newState);
- }
- else {
- LOG.info(taskNameWithSubtask + " switched to " + newState + " with exception.", error);
- }
-
TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error);
for (TaskExecutionStateListener listener : taskExecutionStateListeners) {
@@ -1066,24 +1110,29 @@ public class Task implements Runnable, TaskActions {
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
- "Error while triggering checkpoint for " + taskName,
- t));
+ "Error while triggering checkpoint " + checkpointID + " for " +
+ taskNameWithSubtask, t));
+ } else {
+ LOG.debug("Encountered error while triggering checkpoint {} for " +
+ "{} ({}) while being not in state running.", checkpointID,
+ taskNameWithSubtask, executionId, t);
}
}
}
};
- executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName);
+ executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
}
else {
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
+
+ LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
+ taskNameWithSubtask, executionId);
- LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
- + taskNameWithSubtask);
}
}
else {
- LOG.debug("Declining checkpoint request for non-running task");
+ LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
@@ -1120,12 +1169,12 @@ public class Task implements Runnable, TaskActions {
executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName);
}
else {
- LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - "
- + taskNameWithSubtask);
+ LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.",
+ taskNameWithSubtask);
}
}
else {
- LOG.debug("Ignoring checkpoint commit notification for non-running task.");
+ LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);
}
}
@@ -1228,14 +1277,14 @@ public class Task implements Runnable, TaskActions {
invokable.cancel();
}
catch (Throwable t) {
- LOG.error("Error while canceling task " + taskNameWithSubtask, t);
+ LOG.error("Error while canceling task {}.", taskNameWithSubtask, t);
}
}
}
@Override
public String toString() {
- return taskNameWithSubtask + " [" + executionState + ']';
+ return String.format("%s (%s) [%s]", taskNameWithSubtask, executionId, executionState);
}
/**
@@ -1312,7 +1361,7 @@ public class Task implements Runnable, TaskActions {
try {
invokable.cancel();
} catch (Throwable t) {
- logger.error("Error while canceling the task", t);
+ logger.error("Error while canceling the task {}.", taskName, t);
}
// Early release of input and output buffer pools. We do this
@@ -1326,7 +1375,7 @@ public class Task implements Runnable, TaskActions {
try {
partition.destroyBufferPool();
} catch (Throwable t) {
- LOG.error("Failed to release result partition buffer pool.", t);
+ LOG.error("Failed to release result partition buffer pool for task {}.", taskName, t);
}
}
@@ -1334,7 +1383,7 @@ public class Task implements Runnable, TaskActions {
try {
inputGate.releaseAllResources();
} catch (Throwable t) {
- LOG.error("Failed to release input gate.", t);
+ LOG.error("Failed to release input gate for task {}.", taskName, t);
}
}
@@ -1352,7 +1401,7 @@ public class Task implements Runnable, TaskActions {
watchDogThread.join();
}
} catch (Throwable t) {
- logger.error("Error in the task canceler", t);
+ logger.error("Error in the task canceler for task {}.", taskName, t);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
new file mode 100644
index 0000000..6ee0141
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
+
+ @Test
+ public void testPathConversion() {
+ final long checkpointId = 42L;
+
+ final String path = ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId);
+
+ assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea708071/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 54f6c10..88a29ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -211,7 +211,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
boolean disposed = false;
try {
// -------- Initialize ---------
- LOG.debug("Initializing {}", getName());
+ LOG.debug("Initializing {}.", getName());
asyncOperationsThreadPool = Executors.newCachedThreadPool();
@@ -528,8 +528,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
if (isRunning) {
- throw e;
+ throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
+ "for operator " + getName() + '.', e);
} else {
+ LOG.debug("Could not perform checkpoint {} for operator {} while the " +
+ "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
return false;
}
}
@@ -541,10 +544,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
performCheckpoint(checkpointMetaData);
}
catch (CancelTaskException e) {
- throw e;
+ throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " +
+ checkpointMetaData.getCheckpointId() + '.');
}
catch (Exception e) {
- throw new Exception("Error while performing a checkpoint", e);
+ throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " +
+ getName() + '.', e);
}
}
@@ -678,7 +683,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (stateBackend != null) {
// backend has been configured on the environment
- LOG.info("Using user-defined state backend: " + stateBackend);
+ LOG.info("Using user-defined state backend: {}.", stateBackend);
} else {
// see if we have a backend specified in the configuration
Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
@@ -697,8 +702,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
case "filesystem":
FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
- LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
- + backend.getBasePath() + "\")");
+ LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
+ backend.getBasePath());
stateBackend = backend;
break;
@@ -933,7 +938,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
} catch (Exception e) {
// registers the exception and tries to fail the whole task
- AsynchronousException asyncException = new AsynchronousException(e);
+ AsynchronousException asyncException = new AsynchronousException(
+ new Exception(
+ "Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() +
+ " for operator " + owner.getName() + '.',
+ e));
owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
} finally {
owner.cancelables.unregisterClosable(this);