You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/12/18 23:24:06 UTC

[mesos] 01/05: Made master process all authorization results for `LAUNCH_GROUP`.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 51a48371aa2566955ca5aead6a95a19b0ecfd0be
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Fri Dec 14 11:01:29 2018 -0800

    Made master process all authorization results for `LAUNCH_GROUP`.
    
    This patch fixes a bug where the master does not complete the processing
    of authorization results for `LAUNCH_GROUP`, causing a subsequent
    operation to drop if one of the remaining authorization is denied.
    
    Review: https://reviews.apache.org/r/69571
---
 src/master/master.cpp | 79 +++++++++++++++++++++++++++++----------------------
 1 file changed, 45 insertions(+), 34 deletions(-)

diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6ae53c1..e5fe40e 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4754,8 +4754,8 @@ void Master::_accept(
   // The order of the conversions is important and preserved.
   vector<ResourceConversion> conversions;
 
-  // The order of `authorizations` must match the order of the operations in
-  // `accept.operations()`, as they are iterated through simultaneously.
+  // The order of `authorizations` must match the order of the operations and/or
+  // tasks in `accept.operations()` as they are iterated through simultaneously.
   CHECK_READY(_authorizations);
   std::deque<Future<bool>> authorizations(
       _authorizations->begin(), _authorizations->end());
@@ -4764,6 +4764,7 @@ void Master::_accept(
     switch (operation.type()) {
       // The RESERVE operation allows a principal to reserve resources.
       case Offer::Operation::RESERVE: {
+        CHECK(!authorizations.empty());
         Future<bool> authorization = authorizations.front();
         authorizations.pop_front();
 
@@ -4839,6 +4840,7 @@ void Master::_accept(
 
       // The UNRESERVE operation allows a principal to unreserve resources.
       case Offer::Operation::UNRESERVE: {
+        CHECK(!authorizations.empty());
         Future<bool> authorization = authorizations.front();
         authorizations.pop_front();
 
@@ -4904,6 +4906,7 @@ void Master::_accept(
       }
 
       case Offer::Operation::CREATE: {
+        CHECK(!authorizations.empty());
         Future<bool> authorization = authorizations.front();
         authorizations.pop_front();
 
@@ -4981,6 +4984,7 @@ void Master::_accept(
       }
 
       case Offer::Operation::DESTROY: {
+        CHECK(!authorizations.empty());
         Future<bool> authorization = authorizations.front();
         authorizations.pop_front();
 
@@ -5074,6 +5078,7 @@ void Master::_accept(
       }
 
       case Offer::Operation::GROW_VOLUME: {
+        CHECK(!authorizations.empty());
         Future<bool> authorization = authorizations.front();
         authorizations.pop_front();
 
@@ -5156,6 +5161,7 @@ void Master::_accept(
       }
 
       case Offer::Operation::SHRINK_VOLUME: {
+        CHECK(!authorizations.empty());
         Future<bool> authorization = authorizations.front();
         authorizations.pop_front();
 
@@ -5239,6 +5245,7 @@ void Master::_accept(
 
       case Offer::Operation::LAUNCH: {
         foreach (const TaskInfo& task, operation.launch().task_infos()) {
+          CHECK(!authorizations.empty());
           Future<bool> authorization = authorizations.front();
           authorizations.pop_front();
 
@@ -5474,51 +5481,50 @@ void Master::_accept(
         // TODO(bmahler): Consider injecting some default (cpus, mem, disk)
         // resources when the framework omits the executor resources.
 
-        // See if there are any validation or authorization errors.
+        // See if there are any authorization or validation errors.
         // Note that we'll only report the first error we encounter
         // for the group.
         //
         // TODO(anindya_sinha): If task group uses shared resources, this
         // validation needs to be enhanced to accommodate multiple copies
         // of shared resources across tasks within the task group.
-        Option<Error> error = validation::task::group::validate(
-            taskGroup, executor, framework, slave, _offeredResources);
+        Option<Error> error;
+        Option<TaskStatus::Reason> reason;
 
-        Option<TaskStatus::Reason> reason = None();
-
-        if (error.isSome()) {
-          reason = TaskStatus::REASON_TASK_GROUP_INVALID;
-        } else {
-          foreach (const TaskInfo& task, taskGroup.tasks()) {
-            Future<bool> authorization = authorizations.front();
-            authorizations.pop_front();
-
-            CHECK(!authorization.isDiscarded());
-
-            if (authorization.isFailed()) {
-              error = Error("Failed to authorize task"
-                            " '" + stringify(task.task_id()) + "'"
-                            ": " + authorization.failure());
+        // NOTE: We check for the authorization errors first and never break the
+        // loop to ensure that all authorization futures for this task group are
+        // iterated through.
+        foreach (const TaskInfo& task, taskGroup.tasks()) {
+          CHECK(!authorizations.empty());
+          Future<bool> authorization = authorizations.front();
+          authorizations.pop_front();
 
-              reason = TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+          CHECK(!authorization.isDiscarded());
 
-              break;
+          if (authorization.isFailed()) {
+            error = Error("Failed to authorize task"
+                          " '" + stringify(task.task_id()) + "'"
+                          ": " + authorization.failure());
+          } else if (!authorization.get()) {
+            string user = framework->info.user(); // Default user.
+            if (task.has_command() && task.command().has_user()) {
+              user = task.command().user();
             }
 
-            if (!authorization.get()) {
-              string user = framework->info.user(); // Default user.
-              if (task.has_command() && task.command().has_user()) {
-                user = task.command().user();
-              }
-
-              error = Error("Task '" + stringify(task.task_id()) + "'"
-                            " is not authorized to launch as"
-                            " user '" + user + "'");
+            error = Error("Task '" + stringify(task.task_id()) + "'"
+                          " is not authorized to launch as"
+                          " user '" + user + "'");
+          }
+        }
 
-              reason = TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+        if (error.isSome()) {
+          reason = TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+        } else {
+          error = validation::task::group::validate(
+              taskGroup, executor, framework, slave, _offeredResources);
 
-              break;
-            }
+          if (error.isSome()) {
+            reason = TaskStatus::REASON_TASK_GROUP_INVALID;
           }
         }
 
@@ -5751,6 +5757,11 @@ void Master::_accept(
     }
   }
 
+  CHECK(authorizations.empty())
+    << "Authorization results not processed: "
+    << stringify(
+           vector<Future<bool>>(authorizations.begin(), authorizations.end()));
+
   // Update the allocator based on the operations.
   if (!conversions.empty()) {
     allocator->updateAllocation(