You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/15 21:21:12 UTC

[11/13] mesos git commit: Kept TaskInfo beyond first scheduler ack in command executor.

Kept TaskInfo beyond first scheduler ack in command executor.

Prior to this patch, command executor wipes TaskInfo after receiving
a status update acknowledgement from the scheduler to indicate that
there are no unacknowledged tasks. Keeping original TaskInfo beyond
the ack can be beneficial, hence we introduce a struct TaskData that
holds TaskInfo and explicit ack flag.

Review: https://reviews.apache.org/r/57596


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a67231b0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a67231b0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a67231b0

Branch: refs/heads/master
Commit: a67231b004202a196370d6e6ea83fd75f76c7beb
Parents: 6b332ad
Author: Alexander Rukletsov <al...@apache.org>
Authored: Tue Mar 14 12:36:09 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Mar 15 22:20:20 2017 +0100

----------------------------------------------------------------------
 src/launcher/executor.cpp | 66 +++++++++++++++++++++++++-----------------
 1 file changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a67231b0/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 57e4a32..02834c9 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -125,6 +125,7 @@ public:
       const Duration& _shutdownGracePeriod)
     : ProcessBase(process::ID::generate("command-executor")),
       state(DISCONNECTED),
+      taskData(None()),
       launched(false),
       killed(false),
       killedByHealthCheck(false),
@@ -143,7 +144,6 @@ public:
       capabilities(_capabilities),
       frameworkId(_frameworkId),
       executorId(_executorId),
-      unacknowledgedTask(None()),
       lastTaskStatus(None())
   {
 #ifdef __WINDOWS__
@@ -229,8 +229,10 @@ public:
         // Remove the corresponding update.
         unacknowledgedUpdates.erase(uuid);
 
-        // Remove the corresponding task.
-        unacknowledgedTask = None();
+        // Mark task as acknowledged.
+        CHECK(taskData.isSome());
+        taskData->acknowledged = true;
+
         break;
       }
 
@@ -348,9 +350,9 @@ protected:
     }
 
     // Send the unacknowledged task.
-    if (unacknowledgedTask.isSome()) {
+    if (taskData.isSome() && !taskData->acknowledged) {
       subscribe->add_unacknowledged_tasks()->MergeFrom(
-          unacknowledgedTask.get());
+          taskData->taskInfo);
     }
 
     mesos->send(evolve(call));
@@ -358,13 +360,13 @@ protected:
     delay(Seconds(1), self(), &Self::doReliableRegistration);
   }
 
-  void launch(const TaskInfo& _task)
+  void launch(const TaskInfo& task)
   {
     CHECK_EQ(SUBSCRIBED, state);
 
     if (launched) {
       TaskStatus status = createTaskStatus(
-          _task.task_id(),
+          task.task_id(),
           TASK_FAILED,
           None(),
           "Attempted to run multiple tasks using a \"command\" executor");
@@ -374,14 +376,15 @@ protected:
     }
 
     // Capture the task.
-    unacknowledgedTask = _task;
+    CHECK(taskData.isNone());
+    taskData = TaskData(task);
 
     // Capture the TaskID.
-    taskId = unacknowledgedTask->task_id();
+    taskId = task.task_id();
 
     // Capture the kill policy.
-    if (unacknowledgedTask->has_kill_policy()) {
-      killPolicy = unacknowledgedTask->kill_policy();
+    if (task.has_kill_policy()) {
+      killPolicy = task.kill_policy();
     }
 
     // Determine the command to launch the task.
@@ -400,10 +403,10 @@ protected:
       }
 
       command = parse.get();
-    } else if (unacknowledgedTask->has_command()) {
-      command = unacknowledgedTask->command();
+    } else if (task.has_command()) {
+      command = task.command();
     } else {
-      LOG(FATAL) << "Expecting task '" << unacknowledgedTask->task_id()
+      LOG(FATAL) << "Expecting task '" << taskData.taskInfo->task_id()
                  << "' to have a command";
     }
 
@@ -414,11 +417,11 @@ protected:
     // correct solution is to perform this validation at master side.
     if (command.shell()) {
       CHECK(command.has_value())
-        << "Shell command of task '" << unacknowledgedTask->task_id()
+        << "Shell command of task '" << taskData.taskInfo->task_id()
         << "' is not specified!";
     } else {
       CHECK(command.has_value())
-        << "Executable of task '" << unacknowledgedTask->task_id()
+        << "Executable of task '" << taskData.taskInfo->task_id()
         << "' is not specified!";
     }
 
@@ -447,7 +450,7 @@ protected:
       launchEnvironment.MergeFrom(command.environment());
     }
 
-    cout << "Starting task " << unacknowledgedTask->task_id() << endl;
+    cout << "Starting task " << taskData.taskInfo->task_id() << endl;
 
 #ifndef __WINDOWS__
     pid = launchTaskPosix(
@@ -475,10 +478,10 @@ protected:
 
     cout << "Forked command at " << pid << endl;
 
-    if (unacknowledgedTask->has_health_check()) {
+    if (task.has_health_check()) {
       vector<string> namespaces;
       if (rootfs.isSome() &&
-          unacknowledgedTask->health_check().type() == HealthCheck::COMMAND) {
+          task.health_check().type() == HealthCheck::COMMAND) {
         // Make sure command health checks are run from the task's mount
         // namespace. Otherwise if rootfs is specified the command binary
         // may not be available in the executor.
@@ -490,10 +493,10 @@ protected:
 
       Try<Owned<checks::HealthChecker>> _healthChecker =
         checks::HealthChecker::create(
-            unacknowledgedTask->health_check(),
+            task.health_check(),
             launcherDir,
             defer(self(), &Self::taskHealthUpdated, lambda::_1),
-            unacknowledgedTask->task_id(),
+            taskData.taskInfo->task_id(),
             pid,
             namespaces);
 
@@ -511,7 +514,7 @@ protected:
       .onAny(defer(self(), &Self::reaped, pid, lambda::_1));
 
     TaskStatus status =
-      createTaskStatus(unacknowledgedTask->task_id(), TASK_RUNNING);
+      createTaskStatus(taskData.taskInfo->task_id(), TASK_RUNNING);
 
     forward(status);
     launched = true;
@@ -848,6 +851,21 @@ private:
     SUBSCRIBED
   } state;
 
+  struct TaskData
+  {
+    explicit TaskData(const TaskInfo& _taskInfo)
+      : taskInfo(_taskInfo), acknowledged(false) {}
+
+    TaskInfo taskInfo;
+
+    // Indicates whether a status update acknowledgement
+    // has been received for any status update.
+    bool acknowledged;
+  };
+
+  // Once `TaskInfo` is received, it is cached for later access.
+  Option<TaskData> taskData;
+
   // TODO(alexr): Introduce a state enum and document transitions,
   // see MESOS-5252.
   bool launched;
@@ -880,10 +898,6 @@ private:
 
   LinkedHashMap<UUID, Call::Update> unacknowledgedUpdates;
 
-  // `None` if there is either no task yet or no status
-  // update acknowledgements have been received yet.
-  Option<TaskInfo> unacknowledgedTask;
-
   Option<TaskStatus> lastTaskStatus;
 
   Owned<checks::HealthChecker> healthChecker;