You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/23 23:17:59 UTC

[03/10] mesos git commit: Kept TaskInfo beyond first scheduler ack in default executor.

Kept TaskInfo beyond first scheduler ack in default executor.

Instead of maintaining a separate collection for unacknowledged tasks,
we augment internal `Container` struct by the corresponding `TaskInfo`
and `acknowledged` flag. This way we are still able to find all
unacknowledged tasks (slightly less efficiently as before since now
we have to iterate through all tasks), but also keep `TaskInfo`'s
beyond receiving the first acknowledgement.

Review: https://reviews.apache.org/r/57695/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5567edc9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5567edc9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5567edc9

Branch: refs/heads/master
Commit: 5567edc9df7cab25f3b1a834777c6cae414a104e
Parents: 8093c86
Author: Alexander Rukletsov <ru...@gmail.com>
Authored: Thu Mar 23 17:11:18 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Fri Mar 24 00:17:27 2017 +0100

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 48 +++++++++++++++++++++++-----------
 1 file changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5567edc9/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 5d99def..f83b189 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -86,7 +86,7 @@ private:
   struct Container
   {
     ContainerID containerId;
-    TaskID taskId;
+    TaskInfo taskInfo;
     TaskGroupInfo taskGroup; // Task group of the child container.
 
     // Health checker for the container.
@@ -97,6 +97,10 @@ private:
     // `WAIT_NESTED_CONTAINER` call has not been established yet.
     Option<Connection> waiting;
 
+    // Indicates whether a status update acknowledgement
+    // has been received for any status update.
+    bool acknowledged;
+
     // Set to true if the child container is in the process of being killed.
     bool killing;
 
@@ -207,8 +211,14 @@ public:
         // Remove the corresponding update.
         unacknowledgedUpdates.erase(uuid);
 
-        // Remove the corresponding task.
-        unacknowledgedTasks.erase(event.acknowledged().task_id());
+        // Mark the corresponding task as acknowledged. An acknowledgement
+        // may be received after the task has already been removed from
+        // `containers`.
+        const TaskID taskId = event.acknowledged().task_id();
+        if (containers.contains(taskId)) {
+          containers.at(taskId)->acknowledged = true;
+        }
+
         break;
       }
 
@@ -269,9 +279,14 @@ protected:
       subscribe->add_unacknowledged_updates()->MergeFrom(update);
     }
 
-    // Send the unacknowledged tasks.
-    foreachvalue (const TaskInfo& task, unacknowledgedTasks) {
-      subscribe->add_unacknowledged_tasks()->MergeFrom(task);
+    // Send all unacknowledged tasks. We don't send unacknowledged terminated
+    // (and hence already removed from `containers`) tasks, because for such
+    // tasks `WAIT_NESTED_CONTAINER` call has already succeeded, meaning the
+    // agent knows about the tasks and corresponding containers.
+    foreachvalue (const Owned<Container>& container, containers) {
+      if (!container->acknowledged) {
+        subscribe->add_unacknowledged_tasks()->MergeFrom(container->taskInfo);
+      }
     }
 
     mesos->send(evolve(call));
@@ -445,9 +460,15 @@ protected:
       const TaskInfo& task = taskGroup.tasks().Get(index++);
       const TaskID& taskId = task.task_id();
 
-      unacknowledgedTasks[taskId] = task;
-      containers[taskId] = Owned<Container>(new Container
-        {containerId, taskId, taskGroup, None(), None(), false, false});
+      containers[taskId] = Owned<Container>(new Container{
+        containerId,
+        task,
+        taskGroup,
+        None(),
+        None(),
+        false,
+        false,
+        false});
 
       if (task.has_health_check()) {
         // TODO(anand): Add support for command health checks.
@@ -632,7 +653,7 @@ protected:
     auto retry_ = [this, container]() mutable {
       container->waiting->disconnect();
       container->waiting = None();
-      retry(connectionId.get(), container->taskId);
+      retry(connectionId.get(), container->taskInfo.task_id());
     };
 
     // It is possible that the response failed due to a network blip
@@ -759,7 +780,8 @@ protected:
 
         // Ignore if it's the same task that triggered this callback or
         // if the task is no longer active.
-        if (taskId == container->taskId || !containers.contains(taskId)) {
+        if (taskId == container->taskInfo.task_id() ||
+            !containers.contains(taskId)) {
           continue;
         }
 
@@ -1097,10 +1119,6 @@ private:
 
   LinkedHashMap<UUID, Call::Update> unacknowledgedUpdates;
 
-  // A task is considered unacknowledged if no status update
-  // acknowledgements have been received for it yet.
-  LinkedHashMap<TaskID, TaskInfo> unacknowledgedTasks;
-
   // Active child containers.
   LinkedHashMap<TaskID, Owned<Container>> containers;