You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/03/27 09:38:48 UTC

[6/8] mesos git commit: Avoided copying `Owned` pointers in the default executor.

Avoided copying `Owned` pointers in the default executor.

`Owned` pointers are copied in multiple places of the default executor.
This violates the semantic of owned pointers and works only because
`Owned` is currently implemented with `shared_ptr`, it would otherwise
lead to double-freeing the pointers.

This patch changes those places to use references to the original
`Owned` objects or raw pointers instead of copies.

Review: https://reviews.apache.org/r/65962/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cbc7d0e2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cbc7d0e2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cbc7d0e2

Branch: refs/heads/master
Commit: cbc7d0e2654310063d6ed856b40e3775901e2333
Parents: 7d0b448
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Tue Mar 27 11:37:59 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Tue Mar 27 11:37:59 2018 +0200

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 52 ++++++++++++++--------------------
 1 file changed, 21 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cbc7d0e2/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 57bcdf9..95c4a47 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -172,7 +172,7 @@ public:
 
     // Disconnect all active connections used for
     // waiting on child containers.
-    foreachvalue (Owned<Container> container, containers) {
+    foreachvalue (Owned<Container>& container, containers) {
       if (container->waiting.isSome()) {
         container->waiting->disconnect();
         container->waiting = None();
@@ -180,7 +180,7 @@ public:
     }
 
     // Pause all checks and health checks.
-    foreachvalue (Owned<Container> container, containers) {
+    foreachvalue (Owned<Container>& container, containers) {
       if (container->checker.isSome()) {
         container->checker->get()->pause();
       }
@@ -214,7 +214,7 @@ public:
         }
 
         // Resume all checks and health checks.
-        foreachvalue (Owned<Container> container, containers) {
+        foreachvalue (Owned<Container>& container, containers) {
           if (container->checker.isSome()) {
             container->checker->get()->resume();
           }
@@ -726,7 +726,7 @@ protected:
     CHECK_SOME(connectionId);
     CHECK(containers.contains(taskId));
 
-    Owned<Container> container = containers.at(taskId);
+    Container* container = containers.at(taskId).get();
 
     LOG(INFO) << "Waiting for child container " << container->containerId
               << " of task '" << taskId << "'";
@@ -767,7 +767,7 @@ protected:
     CHECK_EQ(SUBSCRIBED, state);
     CHECK(containers.contains(taskId));
 
-    Owned<Container> container = containers.at(taskId);
+    Container* container = containers.at(taskId).get();
 
     CHECK_SOME(container->waiting);
 
@@ -932,34 +932,16 @@ protected:
 
     forward(taskStatus);
 
-    CHECK(containers.contains(taskId));
-    containers.erase(taskId);
-
     LOG(INFO)
       << "Child container " << container->containerId << " of task '" << taskId
       << "' completed in state " << stringify(taskState)
       << ": " << message.get();
 
-    // Shutdown the executor if all the active child containers have terminated.
-    if (containers.empty()) {
-      _shutdown();
-      return;
-    }
-
-    // Ignore if the executor is already in the process of shutting down.
-    if (shuttingDown) {
-      return;
-    }
-
-    // Ignore if this task group is already in the process of being killed.
-    if (container->killingTaskGroup) {
-      return;
-    }
-
     // The default restart policy for a task group is to kill all the
     // remaining child containers if one of them terminated with a
     // non-zero exit code.
-    if (taskState == TASK_FAILED || taskState == TASK_KILLED) {
+    if (!shuttingDown && !container->killingTaskGroup &&
+        (taskState == TASK_FAILED || taskState == TASK_KILLED)) {
       // Needed for logging.
       auto taskIds = [container]() {
         list<TaskID> taskIds_;
@@ -985,7 +967,7 @@ protected:
           continue;
         }
 
-        Owned<Container> container_ = containers.at(taskId);
+        Container* container_ = containers.at(taskId).get();
         container_->killingTaskGroup = true;
 
         // Ignore if the task is already being killed. This can happen
@@ -996,9 +978,17 @@ protected:
           continue;
         }
 
-        kill(container_.get());
+        kill(container_);
       }
     }
+
+    CHECK(containers.contains(taskId));
+    containers.erase(taskId);
+
+    // Shutdown the executor if all the active child containers have terminated.
+    if (containers.empty()) {
+      _shutdown();
+    }
   }
 
   void shutdown()
@@ -1238,7 +1228,7 @@ protected:
       return;
     }
 
-    const Owned<Container>& container = containers.at(taskId);
+    Container* container = containers.at(taskId).get();
     if (container->killing) {
       LOG(WARNING) << "Ignoring kill for task '" << taskId
                    << "' as it is in the process of getting killed";
@@ -1246,7 +1236,7 @@ protected:
     }
 
     const ContainerID& containerId = container->containerId;
-    kill(container.get(), killPolicy)
+    kill(container, killPolicy)
       .onFailed(defer(self(), [=](const string& failure) {
           LOG(WARNING) << "Failed to kill the task '" << taskId
                        << "' running in child container " << containerId << ": "
@@ -1386,7 +1376,7 @@ private:
     }
 
     CHECK(containers.contains(taskId));
-    const Owned<Container>& container = containers.at(taskId);
+    const Container* container = containers.at(taskId).get();
 
     // TODO(alexr): Augment health information in a way similar to
     // `CheckStatusInfo`. See MESOS-6417 for more details.
@@ -1504,7 +1494,7 @@ private:
     CHECK_SOME(connectionId);
     CHECK(containers.contains(taskId));
 
-    const Owned<Container>& container = containers.at(taskId);
+    const Container* container = containers.at(taskId).get();
 
     if (!connection.isReady()) {
       LOG(ERROR)