You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/04/25 02:23:23 UTC

svn commit: r1330075 - in /incubator/mesos/trunk/src: master/master.cpp slave/slave.cpp slave/slave.hpp

Author: benh
Date: Wed Apr 25 00:23:22 2012
New Revision: 1330075

URL: http://svn.apache.org/viewvc?rev=1330075&view=rev
Log:
Fixes for sending status updates from the slave (after failures, after a framework finishes, etc) instead of from the master (contributed by Charles Reiss).

Modified:
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1330075&r1=1330074&r2=1330075&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Wed Apr 25 00:23:22 2012
@@ -1043,43 +1043,6 @@ void Master::exitedExecutor(const SlaveI
                 << " (" << slave->info.hostname() << ") "
                 << "exited with status " << status;
 
-      // TODO(benh): What about a status update that is on it's way
-      // from the slave but got re-ordered on the wire? By sending
-      // this status updates here we are not allowing possibly
-      // finished tasks to reach the scheduler appropriately. In
-      // stead, it seems like perhaps the right thing to do is to have
-      // the slave be responsible for sending those status updates,
-      // and have the master (or better yet ... and soon ... the
-      // scheduler) decide that a task is dead only when a slave lost
-      // has occured.
-
-      // Tell the framework which tasks have been lost.
-      foreachvalue (Task* task, utils::copy(framework->tasks)) {
-        if (task->slave_id() == slave->id &&
-            task->executor_id() == executorId) {
-          StatusUpdateMessage message;
-          StatusUpdate* update = message.mutable_update();
-          update->mutable_framework_id()->MergeFrom(task->framework_id());
-          update->mutable_executor_id()->MergeFrom(task->executor_id());
-          update->mutable_slave_id()->MergeFrom(task->slave_id());
-          TaskStatus* status = update->mutable_status();
-          status->mutable_task_id()->MergeFrom(task->task_id());
-          status->set_state(TASK_LOST);
-          status->set_message("Lost executor");
-          update->set_timestamp(Clock::now());
-          update->set_uuid(UUID::random().toBytes());
-          send(framework->pid, message);
-
-          LOG(INFO) << "Removing task " << task->task_id()
-                    << " of framework " << frameworkId
-                    << " because of lost executor";
-
-          stats.tasks[TASK_LOST]++;
-
-          removeTask(task);
-        }
-      }
-
       // Remove executor from slave and framework.
       slave->removeExecutor(frameworkId, executorId);
       framework->removeExecutor(slave->id, executorId);

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1330075&r1=1330074&r2=1330075&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 25 00:23:22 2012
@@ -71,6 +71,15 @@ namespace mesos { namespace internal { n
 //   double timeout;
 // };
 
+// Helper function that returns true if the task state is terminal
+bool isTerminalTaskState(TaskState state)
+{
+  return state == TASK_FINISHED ||
+    state == TASK_FAILED ||
+    state == TASK_KILLED ||
+    state == TASK_LOST;
+}
+
 
 Slave::Slave(const Resources& _resources,
              bool _local,
@@ -678,7 +687,14 @@ void Slave::statusUpdateAcknowledgement(
       LOG(INFO) << "Got acknowledgement of status update"
                 << " for task " << taskId
                 << " of framework " << frameworkId;
+
       framework->updates.erase(UUID::fromBytes(uuid));
+
+      // Cleanup if this framework has no executors running and no pending updates.
+      if (framework->executors.size() == 0 && framework->updates.empty()) {
+        frameworks.erase(framework->id);
+        delete framework;
+      }
     }
   }
 }
@@ -969,10 +985,7 @@ void Slave::statusUpdate(const StatusUpd
       executor->updateTaskState(status.task_id(), status.state());
 
       // Handle the task appropriately if it's terminated.
-      if (status.state() == TASK_FINISHED ||
-          status.state() == TASK_FAILED ||
-          status.state() == TASK_KILLED ||
-          status.state() == TASK_LOST) {
+      if (isTerminalTaskState(status.state())) {
         executor->removeTask(status.task_id());
 
         dispatch(isolationModule,
@@ -1299,6 +1312,57 @@ void Slave::executorStarted(const Framew
 }
 
 
+StatusUpdate Slave::createStatusUpdate(const TaskID& taskId,
+                                       const ExecutorID& executorId,
+                                       const FrameworkID& frameworkId,
+                                       TaskState taskState,
+                                       const string& reason)
+{
+  TaskStatus status;
+  status.mutable_task_id()->MergeFrom(taskId);
+  status.set_state(taskState);
+  status.set_message(reason);
+
+  StatusUpdate update;
+  update.mutable_framework_id()->MergeFrom(frameworkId);
+  update.mutable_slave_id()->MergeFrom(id);
+  update.mutable_executor_id()->MergeFrom(executorId);
+  update.mutable_status()->MergeFrom(status);
+  update.set_timestamp(Clock::now());
+  update.set_uuid(UUID::random().toBytes());
+
+  return update;
+}
+
+
+// Called when an executor is exited.
+// Transitions a live task to TASK_LOST/TASK_FAILED and sends status update.
+void Slave::transitionLiveTask(const TaskID& taskId,
+                               const ExecutorID& executorId,
+                               const FrameworkID& frameworkId,
+                               bool isCommandExecutor,
+                               int status)
+{
+  StatusUpdate update;
+
+  if (isCommandExecutor) {
+    update = createStatusUpdate(taskId,
+                                executorId,
+                                frameworkId,
+                                TASK_FAILED,
+                                "Executor running the task's command failed");
+  } else {
+    update = createStatusUpdate(taskId,
+                                executorId,
+                                frameworkId,
+                                TASK_LOST,
+                                "Executor exited");
+  }
+
+  statusUpdate(update);
+}
+
+
 // Called by the isolation module when an executor process exits.
 void Slave::executorExited(const FrameworkID& frameworkId,
                            const ExecutorID& executorId,
@@ -1329,71 +1393,44 @@ void Slave::executorExited(const Framewo
     return;
   }
 
-  // Check if the executor was only for running a task with a command
-  // and if so send a status update for that task.
-  Option<TaskID> taskId;
-
-  if (!executor->queuedTasks.empty() &&
-      executor->queuedTasks.begin()->second.has_command()) {
-    taskId = Option<TaskID>::some(
-        executor->queuedTasks.begin()->second.task_id());
-  } else if (!executor->launchedTasks.empty() &&
-             executor->launchedTasks.begin()->second->has_executor_id()) {
-    taskId = Option<TaskID>::some(
-        executor->launchedTasks.begin()->second->task_id());
-  }
-
-  if (taskId.isSome()) {
-    TaskStatus status;
-    status.mutable_task_id()->MergeFrom(taskId.get());
-    status.set_state(TASK_FAILED);
-    status.set_message("Executor running the task's command failed");
-
-    StatusUpdate update;
-    update.mutable_framework_id()->MergeFrom(frameworkId);
-    update.mutable_slave_id()->MergeFrom(id);
-    update.mutable_status()->MergeFrom(status);
-    update.set_timestamp(Clock::now());
-    update.set_uuid(UUID::random().toBytes());
+  bool isCommandExecutor = false;
 
-    // Send message and record the status for possible resending.
-    StatusUpdateMessage message;
-    message.mutable_update()->MergeFrom(update);
-    message.set_pid(self());
-    send(master, message);
-
-    UUID uuid = UUID::fromBytes(update.uuid());
-
-    // Send us a message to try and resend after some delay.
-    delay(STATUS_UPDATE_RETRY_INTERVAL_SECONDS,
-          self(), &Slave::statusUpdateTimeout,
-          framework->id, uuid);
+  // Transition all live tasks to TASK_LOST/TASK_FAILED.
+  foreachvalue (Task* task, executor->launchedTasks) {
+    if (!isTerminalTaskState(task->state())) {
+      isCommandExecutor = !task->has_executor_id();
+
+      transitionLiveTask(task->task_id(),
+                         executor->id,
+                         framework->id,
+                         isCommandExecutor,
+                         status);
+    }
+  }
 
-    framework->updates[uuid] = update;
+  // Transition all queued tasks to TASK_LOST/TASK_FAILED.
+  foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+    isCommandExecutor = task.has_command();
 
-    stats.tasks[status.state()]++;
+    transitionLiveTask(task.task_id(),
+                       executor->id,
+                       framework->id,
+                       isCommandExecutor,
+                       status);
   }
 
-  // TODO(benh): Send status updates for remaining tasks here rather
-  // than at the master! As in, eliminate the code in
-  // Master::exitedExecutor and put it here.
 
-  ExitedExecutorMessage message;
-  message.mutable_slave_id()->MergeFrom(id);
-  message.mutable_framework_id()->MergeFrom(frameworkId);
-  message.mutable_executor_id()->MergeFrom(executorId);
-  message.set_status(status);
-  send(master, message);
-
-  framework->destroyExecutor(executor->id);
+  if (!isCommandExecutor) {
+    ExitedExecutorMessage message;
+    message.mutable_slave_id()->MergeFrom(id);
+    message.mutable_framework_id()->MergeFrom(frameworkId);
+    message.mutable_executor_id()->MergeFrom(executorId);
+    message.set_status(status);
 
-  // Cleanup if this framework has nothing running.
-  if (framework->executors.size() == 0) {
-    // TODO(benh): But there might be some remaining status updates
-    // that haven't been acknowledged!
-    frameworks.erase(framework->id);
-    delete framework;
+    send(master, message);
   }
+
+  framework->destroyExecutor(executor->id);
 }
 
 
@@ -1441,26 +1478,8 @@ void Slave::shutdownExecutorTimeout(cons
              &IsolationModule::killExecutor,
              framework->id, executor->id);
 
-    ExitedExecutorMessage message;
-    message.mutable_slave_id()->MergeFrom(id);
-    message.mutable_framework_id()->MergeFrom(frameworkId);
-    message.mutable_executor_id()->MergeFrom(executorId);
-    message.set_status(-1);
-    send(master, message);
-
-    // TODO(benh): Send status updates for remaining tasks here rather
-    // than at the master! As in, eliminate the code in
-    // Master::exitedExecutor and put it here.
-
-    framework->destroyExecutor(executor->id);
-
-    // Cleanup if this framework has nothing running.
-    if (framework->executors.size() == 0) {
-      // TODO(benh): But there might be some remaining status updates
-      // that haven't been acknowledged!
-      frameworks.erase(framework->id);
-      delete framework;
-    }
+    // Call executorExited here to transition live tasks to LOST/FAILED.
+    executorExited(frameworkId, executorId, 0);
   }
 }
 

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1330075&r1=1330074&r2=1330075&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 25 00:23:22 2012
@@ -98,6 +98,12 @@ public:
 
   void statusUpdateTimeout(const FrameworkID& frameworkId, const UUID& uuid);
 
+  StatusUpdate createStatusUpdate(const TaskID& taskId,
+                                  const ExecutorID& executorId,
+                                  const FrameworkID& frameworkId,
+                                  TaskState taskState,
+                                  const std::string& message);
+
   void executorStarted(const FrameworkID& frameworkId,
                        const ExecutorID& executorId,
                        pid_t pid);
@@ -106,6 +112,11 @@ public:
                       const ExecutorID& executorId,
                       int status);
 
+  void transitionLiveTask(const TaskID& taskId,
+                          const ExecutorID& executorId,
+                          const FrameworkID& frameworkId,
+                          bool command_executor,
+                          int status);
 protected:
   virtual void initialize();
   virtual void finalize();