You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/04/25 02:23:23 UTC
svn commit: r1330075 - in /incubator/mesos/trunk/src: master/master.cpp
slave/slave.cpp slave/slave.hpp
Author: benh
Date: Wed Apr 25 00:23:22 2012
New Revision: 1330075
URL: http://svn.apache.org/viewvc?rev=1330075&view=rev
Log:
Fixes for sending status updates from the slave (after failures, after a framework finishes, etc) instead of from the master (contributed by Charles Reiss).
Modified:
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1330075&r1=1330074&r2=1330075&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Wed Apr 25 00:23:22 2012
@@ -1043,43 +1043,6 @@ void Master::exitedExecutor(const SlaveI
<< " (" << slave->info.hostname() << ") "
<< "exited with status " << status;
- // TODO(benh): What about a status update that is on it's way
- // from the slave but got re-ordered on the wire? By sending
- // this status updates here we are not allowing possibly
- // finished tasks to reach the scheduler appropriately. In
- // stead, it seems like perhaps the right thing to do is to have
- // the slave be responsible for sending those status updates,
- // and have the master (or better yet ... and soon ... the
- // scheduler) decide that a task is dead only when a slave lost
- // has occured.
-
- // Tell the framework which tasks have been lost.
- foreachvalue (Task* task, utils::copy(framework->tasks)) {
- if (task->slave_id() == slave->id &&
- task->executor_id() == executorId) {
- StatusUpdateMessage message;
- StatusUpdate* update = message.mutable_update();
- update->mutable_framework_id()->MergeFrom(task->framework_id());
- update->mutable_executor_id()->MergeFrom(task->executor_id());
- update->mutable_slave_id()->MergeFrom(task->slave_id());
- TaskStatus* status = update->mutable_status();
- status->mutable_task_id()->MergeFrom(task->task_id());
- status->set_state(TASK_LOST);
- status->set_message("Lost executor");
- update->set_timestamp(Clock::now());
- update->set_uuid(UUID::random().toBytes());
- send(framework->pid, message);
-
- LOG(INFO) << "Removing task " << task->task_id()
- << " of framework " << frameworkId
- << " because of lost executor";
-
- stats.tasks[TASK_LOST]++;
-
- removeTask(task);
- }
- }
-
// Remove executor from slave and framework.
slave->removeExecutor(frameworkId, executorId);
framework->removeExecutor(slave->id, executorId);
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1330075&r1=1330074&r2=1330075&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 25 00:23:22 2012
@@ -71,6 +71,15 @@ namespace mesos { namespace internal { n
// double timeout;
// };
+// Helper function that returns true if the task state is terminal
+bool isTerminalTaskState(TaskState state)
+{
+ return state == TASK_FINISHED ||
+ state == TASK_FAILED ||
+ state == TASK_KILLED ||
+ state == TASK_LOST;
+}
+
Slave::Slave(const Resources& _resources,
bool _local,
@@ -678,7 +687,14 @@ void Slave::statusUpdateAcknowledgement(
LOG(INFO) << "Got acknowledgement of status update"
<< " for task " << taskId
<< " of framework " << frameworkId;
+
framework->updates.erase(UUID::fromBytes(uuid));
+
+ // Cleanup if this framework has no executors running and no pending updates.
+ if (framework->executors.size() == 0 && framework->updates.empty()) {
+ frameworks.erase(framework->id);
+ delete framework;
+ }
}
}
}
@@ -969,10 +985,7 @@ void Slave::statusUpdate(const StatusUpd
executor->updateTaskState(status.task_id(), status.state());
// Handle the task appropriately if it's terminated.
- if (status.state() == TASK_FINISHED ||
- status.state() == TASK_FAILED ||
- status.state() == TASK_KILLED ||
- status.state() == TASK_LOST) {
+ if (isTerminalTaskState(status.state())) {
executor->removeTask(status.task_id());
dispatch(isolationModule,
@@ -1299,6 +1312,57 @@ void Slave::executorStarted(const Framew
}
+StatusUpdate Slave::createStatusUpdate(const TaskID& taskId,
+ const ExecutorID& executorId,
+ const FrameworkID& frameworkId,
+ TaskState taskState,
+ const string& reason)
+{
+ TaskStatus status;
+ status.mutable_task_id()->MergeFrom(taskId);
+ status.set_state(taskState);
+ status.set_message(reason);
+
+ StatusUpdate update;
+ update.mutable_framework_id()->MergeFrom(frameworkId);
+ update.mutable_slave_id()->MergeFrom(id);
+ update.mutable_executor_id()->MergeFrom(executorId);
+ update.mutable_status()->MergeFrom(status);
+ update.set_timestamp(Clock::now());
+ update.set_uuid(UUID::random().toBytes());
+
+ return update;
+}
+
+
+// Called when an executor is exited.
+// Transitions a live task to TASK_LOST/TASK_FAILED and sends status update.
+void Slave::transitionLiveTask(const TaskID& taskId,
+ const ExecutorID& executorId,
+ const FrameworkID& frameworkId,
+ bool isCommandExecutor,
+ int status)
+{
+ StatusUpdate update;
+
+ if (isCommandExecutor) {
+ update = createStatusUpdate(taskId,
+ executorId,
+ frameworkId,
+ TASK_FAILED,
+ "Executor running the task's command failed");
+ } else {
+ update = createStatusUpdate(taskId,
+ executorId,
+ frameworkId,
+ TASK_LOST,
+ "Executor exited");
+ }
+
+ statusUpdate(update);
+}
+
+
// Called by the isolation module when an executor process exits.
void Slave::executorExited(const FrameworkID& frameworkId,
const ExecutorID& executorId,
@@ -1329,71 +1393,44 @@ void Slave::executorExited(const Framewo
return;
}
- // Check if the executor was only for running a task with a command
- // and if so send a status update for that task.
- Option<TaskID> taskId;
-
- if (!executor->queuedTasks.empty() &&
- executor->queuedTasks.begin()->second.has_command()) {
- taskId = Option<TaskID>::some(
- executor->queuedTasks.begin()->second.task_id());
- } else if (!executor->launchedTasks.empty() &&
- executor->launchedTasks.begin()->second->has_executor_id()) {
- taskId = Option<TaskID>::some(
- executor->launchedTasks.begin()->second->task_id());
- }
-
- if (taskId.isSome()) {
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(taskId.get());
- status.set_state(TASK_FAILED);
- status.set_message("Executor running the task's command failed");
-
- StatusUpdate update;
- update.mutable_framework_id()->MergeFrom(frameworkId);
- update.mutable_slave_id()->MergeFrom(id);
- update.mutable_status()->MergeFrom(status);
- update.set_timestamp(Clock::now());
- update.set_uuid(UUID::random().toBytes());
+ bool isCommandExecutor = false;
- // Send message and record the status for possible resending.
- StatusUpdateMessage message;
- message.mutable_update()->MergeFrom(update);
- message.set_pid(self());
- send(master, message);
-
- UUID uuid = UUID::fromBytes(update.uuid());
-
- // Send us a message to try and resend after some delay.
- delay(STATUS_UPDATE_RETRY_INTERVAL_SECONDS,
- self(), &Slave::statusUpdateTimeout,
- framework->id, uuid);
+ // Transition all live tasks to TASK_LOST/TASK_FAILED.
+ foreachvalue (Task* task, executor->launchedTasks) {
+ if (!isTerminalTaskState(task->state())) {
+ isCommandExecutor = !task->has_executor_id();
+
+ transitionLiveTask(task->task_id(),
+ executor->id,
+ framework->id,
+ isCommandExecutor,
+ status);
+ }
+ }
- framework->updates[uuid] = update;
+ // Transition all queued tasks to TASK_LOST/TASK_FAILED.
+ foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+ isCommandExecutor = task.has_command();
- stats.tasks[status.state()]++;
+ transitionLiveTask(task.task_id(),
+ executor->id,
+ framework->id,
+ isCommandExecutor,
+ status);
}
- // TODO(benh): Send status updates for remaining tasks here rather
- // than at the master! As in, eliminate the code in
- // Master::exitedExecutor and put it here.
- ExitedExecutorMessage message;
- message.mutable_slave_id()->MergeFrom(id);
- message.mutable_framework_id()->MergeFrom(frameworkId);
- message.mutable_executor_id()->MergeFrom(executorId);
- message.set_status(status);
- send(master, message);
-
- framework->destroyExecutor(executor->id);
+ if (!isCommandExecutor) {
+ ExitedExecutorMessage message;
+ message.mutable_slave_id()->MergeFrom(id);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_status(status);
- // Cleanup if this framework has nothing running.
- if (framework->executors.size() == 0) {
- // TODO(benh): But there might be some remaining status updates
- // that haven't been acknowledged!
- frameworks.erase(framework->id);
- delete framework;
+ send(master, message);
}
+
+ framework->destroyExecutor(executor->id);
}
@@ -1441,26 +1478,8 @@ void Slave::shutdownExecutorTimeout(cons
&IsolationModule::killExecutor,
framework->id, executor->id);
- ExitedExecutorMessage message;
- message.mutable_slave_id()->MergeFrom(id);
- message.mutable_framework_id()->MergeFrom(frameworkId);
- message.mutable_executor_id()->MergeFrom(executorId);
- message.set_status(-1);
- send(master, message);
-
- // TODO(benh): Send status updates for remaining tasks here rather
- // than at the master! As in, eliminate the code in
- // Master::exitedExecutor and put it here.
-
- framework->destroyExecutor(executor->id);
-
- // Cleanup if this framework has nothing running.
- if (framework->executors.size() == 0) {
- // TODO(benh): But there might be some remaining status updates
- // that haven't been acknowledged!
- frameworks.erase(framework->id);
- delete framework;
- }
+ // Call executorExited here to transition live tasks to LOST/FAILED.
+ executorExited(frameworkId, executorId, 0);
}
}
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1330075&r1=1330074&r2=1330075&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 25 00:23:22 2012
@@ -98,6 +98,12 @@ public:
void statusUpdateTimeout(const FrameworkID& frameworkId, const UUID& uuid);
+ StatusUpdate createStatusUpdate(const TaskID& taskId,
+ const ExecutorID& executorId,
+ const FrameworkID& frameworkId,
+ TaskState taskState,
+ const std::string& message);
+
void executorStarted(const FrameworkID& frameworkId,
const ExecutorID& executorId,
pid_t pid);
@@ -106,6 +112,11 @@ public:
const ExecutorID& executorId,
int status);
+ void transitionLiveTask(const TaskID& taskId,
+ const ExecutorID& executorId,
+ const FrameworkID& frameworkId,
+ bool command_executor,
+ int status);
protected:
virtual void initialize();
virtual void finalize();