You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/23 23:18:02 UTC

[06/10] mesos git commit: Reused previous task status to generate a new one in default executor.

Reused previous task status to generate a new one in default executor.

Sometimes when a new task status update is generated in the executor,
we have to make sure specific data is duplicated from the previous
task status to, e.g., avoid shadowing of these data during
reconciliation. For instance, consider a check status being sent;
in this status update we must include the latest known health
information.

This patch also refactors `update()` routine into two separate calls:
`createTaskStatus()` which is responsible for creating a task status
from scratch and `forward()`, which is responsible for forwarding task
status updates to the agent.

Review: https://reviews.apache.org/r/56215/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fb86531e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fb86531e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fb86531e

Branch: refs/heads/master
Commit: fb86531e9ece829bfac994d9d7d7242a16ff8fba
Parents: 5567edc
Author: Alexander Rukletsov <ru...@gmail.com>
Authored: Thu Mar 23 17:11:23 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Fri Mar 24 00:17:27 2017 +0100

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 67 +++++++++++++++++++++++++---------
 1 file changed, 50 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fb86531e/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index f83b189..f80e79e 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -89,6 +89,8 @@ private:
     TaskInfo taskInfo;
     TaskGroupInfo taskGroup; // Task group of the child container.
 
+    Option<TaskStatus> lastTaskStatus;
+
     // Health checker for the container.
     Option<Owned<checks::HealthChecker>> healthChecker;
 
@@ -466,6 +468,7 @@ protected:
         taskGroup,
         None(),
         None(),
+        None(),
         false,
         false,
         false});
@@ -523,7 +526,8 @@ protected:
     // Send a TASK_RUNNING status update now that the task group has
     // been successfully launched.
     foreach (const TaskInfo& task, taskGroup.tasks()) {
-      update(task.task_id(), TASK_RUNNING);
+      const TaskStatus status = createTaskStatus(task.task_id(), TASK_RUNNING);
+      forward(status);
     }
 
     auto taskIds = [&taskGroup]() {
@@ -726,12 +730,19 @@ protected:
       message = "Command " + WSTRINGIFY(status.get());
     }
 
+    TaskStatus taskStatus = createTaskStatus(
+        taskId,
+        taskState,
+        None(),
+        message);
+
+    // Indicate that a task has been unhealthy upon termination.
     if (unhealthy) {
-      update(taskId, taskState, message, false);
-    } else {
-      update(taskId, taskState, message, None());
+      taskStatus.set_healthy(false);
     }
 
+    forward(taskStatus);
+
     CHECK(containers.contains(taskId));
     containers.erase(taskId);
 
@@ -954,8 +965,21 @@ protected:
               << " '" << healthStatus.task_id() << "', task is "
               << (healthStatus.healthy() ? "healthy" : "not healthy");
 
-    update(
-        healthStatus.task_id(), TASK_RUNNING, None(), healthStatus.healthy());
+    // Use the previous task status to preserve all attached information.
+    // We always send a `TASK_RUNNING` right after the task is launched.
+    CHECK_SOME(containers.at(healthStatus.task_id())->lastTaskStatus);
+    const TaskStatus status = protobuf::createTaskStatus(
+        containers.at(healthStatus.task_id())->lastTaskStatus.get(),
+        UUID::random(),
+        Clock::now().secs(),
+        None(),
+        None(),
+        None(),
+        None(),
+        None(),
+        healthStatus.healthy());
+
+    forward(status);
 
     if (healthStatus.kill_task()) {
       unhealthy = true;
@@ -964,29 +988,29 @@ protected:
   }
 
 private:
-  void update(
+  // Use this helper to create a status update from scratch, i.e., without
+  // previously attached extra information like `data` or `check_status`.
+  TaskStatus createTaskStatus(
       const TaskID& taskId,
       const TaskState& state,
-      const Option<string>& message = None(),
-      const Option<bool>& healthy = None())
+      const Option<TaskStatus::Reason>& reason = None(),
+      const Option<string>& message = None())
   {
-    UUID uuid = UUID::random();
-
     TaskStatus status = protobuf::createTaskStatus(
         taskId,
         state,
-        uuid,
+        UUID::random(),
         Clock::now().secs());
 
     status.mutable_executor_id()->CopyFrom(executorId);
     status.set_source(TaskStatus::SOURCE_EXECUTOR);
 
-    if (message.isSome()) {
-      status.set_message(message.get());
+    if (reason.isSome()) {
+      status.set_reason(reason.get());
     }
 
-    if (healthy.isSome()) {
-      status.set_healthy(healthy.get());
+    if (message.isSome()) {
+      status.set_message(message.get());
     }
 
     // Fill the container ID associated with this task.
@@ -996,6 +1020,11 @@ private:
     ContainerStatus* containerStatus = status.mutable_container_status();
     containerStatus->mutable_container_id()->CopyFrom(container->containerId);
 
+    return status;
+  }
+
+  void forward(const TaskStatus& status)
+  {
     Call call;
     call.set_type(Call::UPDATE);
 
@@ -1005,7 +1034,11 @@ private:
     call.mutable_update()->mutable_status()->CopyFrom(status);
 
     // Capture the status update.
-    unacknowledgedUpdates[uuid] = call.update();
+    unacknowledgedUpdates[UUID::fromBytes(status.uuid()).get()] = call.update();
+
+    // Overwrite the last task status.
+    CHECK(containers.contains(status.task_id()));
+    containers.at(status.task_id())->lastTaskStatus = status;
 
     mesos->send(evolve(call));
   }