You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by as...@apache.org on 2020/03/03 07:52:53 UTC

[mesos] 06/08: Converted ACCEPT to synchronous authorization.

This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 495db0e983222a5038a26b93e0ca689f2ce009a0
Author: Andrei Sekretenko <as...@mesosphere.com>
AuthorDate: Wed Jan 29 15:24:32 2020 +0100

    Converted ACCEPT to synchronous authorization.
    
    This patch converts ACCEPT call to synchronous authorization
    (see MESOS-10056), thus fixing race between ACCEPT and REVIVE
    (MESOS-10023) and removing potential for other similar races.
    
    It also moves authorization of scheduler API operations after their
    validation (thus fixing MESOS-10083) and effectively gets rid of the
    concept of a "task pending authorization".
    
    Tests are converted from mocking `Authorizer::authorized(...)`
    to mocking `Authorizer::provideObjectApprover(...)` as necessary.
    
    Review: https://reviews.apache.org/r/72098
---
 src/master/authorization.cpp             |  16 +-
 src/master/framework.cpp                 |  16 +-
 src/master/master.cpp                    | 478 +++++-----------------
 src/master/master.hpp                    |   4 +-
 src/tests/master_authorization_tests.cpp | 657 ++-----------------------------
 src/tests/master_tests.cpp               | 103 -----
 src/tests/reconciliation_tests.cpp       |  86 ----
 7 files changed, 149 insertions(+), 1211 deletions(-)

diff --git a/src/master/authorization.cpp b/src/master/authorization.cpp
index 77719eb..6dfa59a 100644
--- a/src/master/authorization.cpp
+++ b/src/master/authorization.cpp
@@ -387,10 +387,20 @@ ostream& operator<<(ostream& stream, const ActionObject& actionObject)
   }
 
   switch (actionObject.action()) {
-    case authorization::RUN_TASK:
+    case authorization::RUN_TASK: {
+      const TaskInfo& task = object->task_info();
+      const FrameworkInfo& framework = object->framework_info();
       return stream
-        << "launch task " << object->task_info().task_id()
-        << " of framework " << object->framework_info().id();
+        << "launch task " << task.task_id()
+        << " of framework " << framework.id()
+        << " under user '"
+        << (task.has_command() && task.command().has_user()
+            ? task.command().user()
+            : task.has_executor() && task.executor().command().has_user()
+              ? task.executor().command().user()
+              : framework.user())
+        << "'";
+    }
 
     case authorization::REGISTER_FRAMEWORK:
       return stream
diff --git a/src/master/framework.cpp b/src/master/framework.cpp
index ffcf367..7e46469 100644
--- a/src/master/framework.cpp
+++ b/src/master/framework.cpp
@@ -740,7 +740,21 @@ Try<bool> Framework::approved(const ActionObject& actionObject) const
 
 
 constexpr std::initializer_list<authorization::Action> SCHEDULER_API_ACTIONS{
-  authorization::REGISTER_FRAMEWORK};
+  authorization::REGISTER_FRAMEWORK,
+  authorization::RUN_TASK,
+
+  authorization::UNRESERVE_RESOURCES,
+  authorization::RESERVE_RESOURCES,
+
+  authorization::CREATE_VOLUME,
+  authorization::DESTROY_VOLUME,
+  authorization::RESIZE_VOLUME,
+
+  authorization::CREATE_MOUNT_DISK,
+  authorization::CREATE_BLOCK_DISK,
+  authorization::DESTROY_MOUNT_DISK,
+  authorization::DESTROY_BLOCK_DISK,
+  authorization::DESTROY_RAW_DISK};
 
 
 Future<Owned<ObjectApprovers>> Framework::createObjectApprovers(
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 07685fe..b09ce8e 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3721,8 +3721,6 @@ Future<bool> Master::authorize(
     *request.mutable_object() = *(std::move(actionObject).object());
   }
 
-  // TODO(asekretenko): Use a background-refreshed ObjectApprover
-  // when they become available (see MESOS-10056).
   return authorizer.get()->authorized(request);
 }
 
@@ -4317,126 +4315,18 @@ void Master::accept(
     }
   }
 
-  const Option<Principal> principal = framework->info.has_principal()
-    ? Principal(framework->info.principal())
-    : Option<Principal>::none();
-
-  // TODO(asekretenko): Use background-refreshed ObjectApprovers
-  // instead of asynchronous authorization.
-  vector<Future<bool>> futures;
-  for (const Offer::Operation& operation : accept.operations()) {
-    switch (operation.type()) {
-      case Offer::Operation::LAUNCH:
-      case Offer::Operation::LAUNCH_GROUP: {
-        for (const TaskInfo& task : getOperationTasks(operation)) {
-          futures.emplace_back(authorize(
-              principal, ActionObject::taskLaunch(task, framework->info)));
-        }
-        break;
-      }
-
-      // NOTE: When handling RESERVE and UNRESERVE operations, authorization
-      // will proceed even if no principal is specified, although currently
-      // resources cannot be reserved or unreserved unless a principal is
-      // provided. Any RESERVE/UNRESERVE operation with no associated principal
-      // will be found invalid when `validate()` is called in `_accept()` below.
-
-      // The RESERVE operation allows a principal to reserve resources.
-      case Offer::Operation::RESERVE: {
-        futures.push_back(authorize(
-            principal, ActionObject::reserve(operation.reserve())));
-
-        break;
-      }
-
-      // The UNRESERVE operation allows a principal to unreserve resources.
-      case Offer::Operation::UNRESERVE: {
-        futures.push_back(authorize(
-            principal, ActionObject::unreserve(operation.unreserve())));
-
-        break;
-      }
-
-      // The CREATE operation allows the creation of a persistent volume.
-      case Offer::Operation::CREATE: {
-        futures.push_back(authorize(
-            principal, ActionObject::createVolume(operation.create())));
-
-        break;
-      }
-
-      // The DESTROY operation allows the destruction of a persistent volume.
-      case Offer::Operation::DESTROY: {
-        futures.push_back(authorize(
-            principal, ActionObject::destroyVolume(operation.destroy())));
-
-        break;
-      }
-
-      case Offer::Operation::GROW_VOLUME: {
-        futures.push_back(authorize(
-            principal, ActionObject::growVolume(operation.grow_volume())));
-
-        break;
-      }
-
-      case Offer::Operation::SHRINK_VOLUME: {
-        futures.push_back(authorize(
-            principal, ActionObject::shrinkVolume(operation.shrink_volume())));
-
-        break;
-      }
-
-      case Offer::Operation::CREATE_DISK: {
-        Try<ActionObject> actionObject =
-          ActionObject::createDisk(operation.create_disk());
-
-        if (actionObject.isError()) {
-          futures.push_back(Failure(actionObject.error()));
-        } else {
-          futures.push_back(authorize(principal, std::move(*actionObject)));
-        }
-
-        break;
-      }
-
-      case Offer::Operation::DESTROY_DISK: {
-        Try<ActionObject> actionObject =
-          ActionObject::destroyDisk(operation.destroy_disk());
-
-        if (actionObject.isError()) {
-          futures.push_back(Failure(actionObject.error()));
-        } else {
-          futures.push_back(authorize(principal, std::move(*actionObject)));
-        }
-
-        break;
-      }
-
-      case Offer::Operation::UNKNOWN: {
-        // TODO(vinod): Send an error event to the scheduler?
-        LOG(WARNING) << "Ignoring unknown operation";
-        break;
-      }
-    }
-  }
-
-  // Wait for all the tasks to be authorized.
-  await(futures)
-    .onAny(defer(self(),
-                 &Master::_accept,
-                 framework->id(),
-                 slaveId,
-                 std::move(accept),
-                 lambda::_1));
+  // TODO(asekretenko): Dismantle `_accept(...)` (which, before synchronous
+  // authorization was introduced, used to be a deferred continuation of ACCEPT
+  // call processing, but now is kept only for limiting variable scopes) and
+  // handle operations one-by-one.
+  _accept(framework->id(), slaveId, std::move(accept));
 }
 
 
 void Master::_accept(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
-    scheduler::Call::Accept&& accept,
-    const Future<vector<Future<bool>>>& _authorizations)
+    scheduler::Call::Accept&& accept)
 {
   auto discardOffers = [this](const RepeatedPtrField<OfferID>& ids) {
     for (const OfferID& offerId : ids) {
@@ -4583,56 +4473,60 @@ void Master::_accept(
   // The order of the conversions is important and preserved.
   vector<ResourceConversion> conversions;
 
-  // The order of `authorizations` must match the order of the operations and/or
-  // tasks in `accept.operations()` as they are iterated through simultaneously.
-  CHECK_READY(_authorizations);
-  std::deque<Future<bool>> authorizations(
-      _authorizations->begin(), _authorizations->end());
-
   foreach (const Offer::Operation& operation, accept.operations()) {
-    switch (operation.type()) {
-      // The RESERVE operation allows a principal to reserve resources.
-      case Offer::Operation::RESERVE: {
-        CHECK(!authorizations.empty());
-        Future<bool> authorization = authorizations.front();
-        authorizations.pop_front();
+    auto authorized_ =
+      [&framework, &operation](const ActionObject& actionObject)
+        -> Option<Error> {
+      const Try<bool> authorized = framework->approved(actionObject);
+      if (authorized.isError()) {
+        return Error(
+            "Failed to authorize principal '" + framework->info.principal() +
+            "' to perform " + Offer::Operation::Type_Name(operation.type()) +
+            ": " + authorized.error());
+      }
 
-        CHECK(!authorization.isDiscarded());
+      if (!*authorized) {
+        return Error(
+            "Principal '" + framework->info.principal() +
+            "' no authorized to " + stringify(actionObject));
+      }
 
-        if (authorization.isFailed()) {
-          // TODO(greggomann): We may want to retry this failed authorization
-          // request rather than dropping it immediately.
-          drop(framework,
-               operation,
-               "Authorization of principal '" + framework->info.principal() +
-               "' to reserve resources failed: " + authorization.failure());
+      return None();
+    };
 
-          continue;
-        } else if (!authorization.get()) {
-          drop(framework,
-               operation,
-               "Not authorized to reserve resources as '" +
-               framework->info.principal() + "'");
+    auto authorized = overload(
+        authorized_,
+        [&authorized_](const vector<ActionObject>& actionObjects) {
+          for (const ActionObject& actionObject : actionObjects) {
+            const Option<Error> error = authorized_(actionObject);
+            if (error.isSome()) {
+              return error;
+            }
+          }
+
+          return Option<Error>::none();
+        });
 
-          continue;
-        }
 
+    switch (operation.type()) {
+      // The RESERVE operation allows a principal to reserve resources.
+      case Offer::Operation::RESERVE: {
         Option<Principal> principal = framework->info.has_principal()
                                         ? Principal(framework->info.principal())
                                         : Option<Principal>::none();
 
-        // Make sure this reserve operation is valid.
         Option<Error> error = validation::operation::validate(
             operation.reserve(),
             principal,
             slave->capabilities,
             framework->info);
 
+        error = error.isSome()
+          ? Error(error->message + "; on agent " + stringify(*slave))
+          : authorized(ActionObject::reserve(operation.reserve()));
+
         if (error.isSome()) {
-          drop(
-              framework,
-              operation,
-              error->message + "; on agent " + stringify(*slave));
+          drop(framework, operation, error->message);
           continue;
         }
 
@@ -4669,35 +4563,13 @@ void Master::_accept(
 
       // The UNRESERVE operation allows a principal to unreserve resources.
       case Offer::Operation::UNRESERVE: {
-        CHECK(!authorizations.empty());
-        Future<bool> authorization = authorizations.front();
-        authorizations.pop_front();
-
-        CHECK(!authorization.isDiscarded());
-
-        if (authorization.isFailed()) {
-          // TODO(greggomann): We may want to retry this failed authorization
-          // request rather than dropping it immediately.
-          drop(framework,
-               operation,
-               "Authorization of principal '" + framework->info.principal() +
-               "' to unreserve resources failed: " +
-               authorization.failure());
-
-          continue;
-        } else if (!authorization.get()) {
-          drop(framework,
-               operation,
-               "Not authorized to unreserve resources as '" +
-                 framework->info.principal() + "'");
-
-          continue;
-        }
-
-        // Make sure this unreserve operation is valid.
         Option<Error> error =
           validation::operation::validate(operation.unreserve());
 
+        error = error.isSome()
+          ? Error(error->message + "; on agent " + stringify(*slave))
+          : authorized(ActionObject::unreserve(operation.unreserve()));
+
         if (error.isSome()) {
           drop(framework, operation, error->message);
           continue;
@@ -4735,31 +4607,6 @@ void Master::_accept(
       }
 
       case Offer::Operation::CREATE: {
-        CHECK(!authorizations.empty());
-        Future<bool> authorization = authorizations.front();
-        authorizations.pop_front();
-
-        CHECK(!authorization.isDiscarded());
-
-        if (authorization.isFailed()) {
-          // TODO(greggomann): We may want to retry this failed authorization
-          // request rather than dropping it immediately.
-          drop(framework,
-               operation,
-               "Authorization of principal '" + framework->info.principal() +
-               "' to create persistent volumes failed: " +
-               authorization.failure());
-
-          continue;
-        } else if (!authorization.get()) {
-          drop(framework,
-               operation,
-               "Not authorized to create persistent volumes as '" +
-                 framework->info.principal() + "'");
-
-          continue;
-        }
-
         Option<Principal> principal = framework->info.has_principal()
                                         ? Principal(framework->info.principal())
                                         : Option<Principal>::none();
@@ -4772,11 +4619,12 @@ void Master::_accept(
             slave->capabilities,
             framework->info);
 
+        error = error.isSome()
+          ? Error(error->message + "; on agent " + stringify(*slave))
+          : authorized(ActionObject::createVolume(operation.create()));
+
         if (error.isSome()) {
-          drop(
-              framework,
-              operation,
-              error->message + "; on agent " + stringify(*slave));
+          drop(framework, operation, error->message);
           continue;
         }
 
@@ -4813,38 +4661,16 @@ void Master::_accept(
       }
 
       case Offer::Operation::DESTROY: {
-        CHECK(!authorizations.empty());
-        Future<bool> authorization = authorizations.front();
-        authorizations.pop_front();
-
-        CHECK(!authorization.isDiscarded());
-
-        if (authorization.isFailed()) {
-          // TODO(greggomann): We may want to retry this failed authorization
-          // request rather than dropping it immediately.
-          drop(framework,
-               operation,
-               "Authorization of principal '" + framework->info.principal() +
-               "' to destroy persistent volumes failed: " +
-               authorization.failure());
-
-          continue;
-        } else if (!authorization.get()) {
-          drop(framework,
-               operation,
-               "Not authorized to destroy persistent volumes as '" +
-                 framework->info.principal() + "'");
-
-          continue;
-        }
-
-        // Make sure this destroy operation is valid.
         Option<Error> error = validation::operation::validate(
             operation.destroy(),
             slave->checkpointedResources,
             slave->usedResources,
             slave->pendingTasks);
 
+        error = error.isSome()
+          ? Error(error->message + "; on agent " + stringify(*slave))
+          : authorized(ActionObject::destroyVolume(operation.destroy()));
+
         if (error.isSome()) {
           drop(framework, operation, error->message);
           continue;
@@ -4901,40 +4727,15 @@ void Master::_accept(
       }
 
       case Offer::Operation::GROW_VOLUME: {
-        CHECK(!authorizations.empty());
-        Future<bool> authorization = authorizations.front();
-        authorizations.pop_front();
-
-        CHECK(!authorization.isDiscarded());
-
-        if (authorization.isFailed()) {
-          // TODO(greggomann): We may want to retry this failed authorization
-          // request rather than dropping it immediately.
-          drop(framework,
-               operation,
-               "Authorization of principal '" + framework->info.principal() +
-               "' to grow a volume failed: " +
-               authorization.failure());
-
-          continue;
-        } else if (!authorization.get()) {
-          drop(framework,
-               operation,
-               "Not authorized to grow a volume as '" +
-                 framework->info.principal() + "'");
-
-          continue;
-        }
-
-        // Make sure this grow volume operation is valid.
         Option<Error> error = validation::operation::validate(
             operation.grow_volume(), slave->capabilities);
 
+        error = error.isSome() ?
+          Error(error->message + "; on agent " + stringify(*slave))
+          : authorized(ActionObject::growVolume(operation.grow_volume()));
+
         if (error.isSome()) {
-          drop(
-              framework,
-              operation,
-              error->message + "; on agent " + stringify(*slave));
+          drop(framework, operation, error->message);
           continue;
         }
 
@@ -4984,40 +4785,15 @@ void Master::_accept(
       }
 
       case Offer::Operation::SHRINK_VOLUME: {
-        CHECK(!authorizations.empty());
-        Future<bool> authorization = authorizations.front();
-        authorizations.pop_front();
-
-        CHECK(!authorization.isDiscarded());
-
-        if (authorization.isFailed()) {
-          // TODO(greggomann): We may want to retry this failed authorization
-          // request rather than dropping it immediately.
-          drop(framework,
-               operation,
-               "Authorization of principal '" + framework->info.principal() +
-               "' to shrink a volume failed: " +
-               authorization.failure());
-
-          continue;
-        } else if (!authorization.get()) {
-          drop(framework,
-               operation,
-               "Not authorized to shrink a volume as '" +
-                 framework->info.principal() + "'");
-
-          continue;
-        }
-
-        // Make sure this shrink volume operation is valid.
         Option<Error> error = validation::operation::validate(
             operation.shrink_volume(), slave->capabilities);
 
+        error = error.isSome()
+          ? Error(error->message + "; on agent " + stringify(*slave))
+          : authorized(ActionObject::shrinkVolume(operation.shrink_volume()));
+
         if (error.isSome()) {
-          drop(
-              framework,
-              operation,
-              error->message + "; on agent " + stringify(*slave));
+          drop(framework, operation, error->message);
           continue;
         }
 
@@ -5068,10 +4844,6 @@ void Master::_accept(
 
       case Offer::Operation::LAUNCH: {
         foreach (const TaskInfo& task, operation.launch().task_infos()) {
-          CHECK(!authorizations.empty());
-          Future<bool> authorization = authorizations.front();
-          authorizations.pop_front();
-
           // The task will not be in `pendingTasks` if it has been
           // killed in the interim. No need to send TASK_KILLED in
           // this case as it has already been sent. Note however that
@@ -5084,7 +4856,10 @@ void Master::_accept(
           // TODO(bmahler): We may send TASK_ERROR after a TASK_KILLED
           // if a task was killed (removed from `pendingTasks`) *and*
           // the task is invalid or unauthorized here.
-
+          //
+          // TODO(asekretenko): Now that ACCEPT is authorized synchronously,
+          // master state cannot change while the task is being authorized,
+          // and all the code for tracking pending tasks can be removed.
           bool pending = framework->pendingTasks.contains(task.task_id());
           framework->pendingTasks.erase(task.task_id());
           slave->pendingTasks[framework->id()].erase(task.task_id());
@@ -5092,17 +4867,10 @@ void Master::_accept(
             slave->pendingTasks.erase(framework->id());
           }
 
-          CHECK(!authorization.isDiscarded());
-
-          if (authorization.isFailed() || !authorization.get()) {
-            string user = framework->info.user(); // Default user.
-            if (task.has_command() && task.command().has_user()) {
-              user = task.command().user();
-            } else if (task.has_executor() &&
-                       task.executor().command().has_user()) {
-              user = task.executor().command().user();
-            }
+          const Option<Error> authorizationError =
+            authorized(ActionObject::taskLaunch(task, framework->info));
 
+          if (authorizationError.isSome()) {
             const StatusUpdate& update = protobuf::createStatusUpdate(
                 framework->id(),
                 task.slave_id(),
@@ -5110,9 +4878,7 @@ void Master::_accept(
                 TASK_ERROR,
                 TaskStatus::SOURCE_MASTER,
                 None(),
-                authorization.isFailed() ?
-                    "Authorization failure: " + authorization.failure() :
-                    "Not authorized to launch as user '" + user + "'",
+                authorizationError->message,
                 TaskStatus::REASON_TASK_UNAUTHORIZED);
 
             metrics->tasks_error++;
@@ -5315,29 +5081,19 @@ void Master::_accept(
         Option<Error> error;
         Option<TaskStatus::Reason> reason;
 
-        // NOTE: We check for the authorization errors first and never break the
-        // loop to ensure that all authorization futures for this task group are
-        // iterated through.
         foreach (const TaskInfo& task, taskGroup.tasks()) {
-          CHECK(!authorizations.empty());
-          Future<bool> authorization = authorizations.front();
-          authorizations.pop_front();
+          const ActionObject actionObject =
+            ActionObject::taskLaunch(task, framework->info);
 
-          CHECK(!authorization.isDiscarded());
+          const Try<bool> approval = framework->approved(actionObject);
 
-          if (authorization.isFailed()) {
+          if (approval.isError()) {
             error = Error("Failed to authorize task"
                           " '" + stringify(task.task_id()) + "'"
-                          ": " + authorization.failure());
-          } else if (!authorization.get()) {
-            string user = framework->info.user(); // Default user.
-            if (task.has_command() && task.command().has_user()) {
-              user = task.command().user();
-            }
-
+                          ": " + approval.error());
+          } else if (!*approval) {
             error = Error("Task '" + stringify(task.task_id()) + "'"
-                          " is not authorized to launch as"
-                          " user '" + user + "'");
+                          " is not authorized to" + stringify(actionObject));
           }
         }
 
@@ -5498,34 +5254,6 @@ void Master::_accept(
       }
 
       case Offer::Operation::CREATE_DISK: {
-        const Resource::DiskInfo::Source::Type diskType =
-          operation.create_disk().target_type();
-
-        CHECK(!authorizations.empty());
-        Future<bool> authorization = authorizations.front();
-        authorizations.pop_front();
-
-        CHECK(!authorization.isDiscarded());
-
-        if (authorization.isFailed()) {
-          // TODO(greggomann): We may want to retry this failed authorization
-          // request rather than dropping it immediately.
-          drop(framework,
-               operation,
-               "Authorization of principal '" + framework->info.principal() +
-                 "' to create a " + stringify(diskType) + " disk failed: " +
-                 authorization.failure());
-
-          continue;
-        } else if (!authorization.get()) {
-          drop(framework,
-               operation,
-               "Not authorized to create a " + stringify(diskType) +
-                 " disk as '" + framework->info.principal() + "'");
-
-          continue;
-        }
-
         if (!slave->capabilities.resourceProvider) {
           drop(framework,
                operation,
@@ -5537,6 +5265,11 @@ void Master::_accept(
         Option<Error> error = validation::operation::validate(
             operation.create_disk());
 
+        error = error.isSome()
+          ? Error(error->message + "; on agent " + stringify(*slave))
+          : authorized(CHECK_NOTERROR(
+              ActionObject::createDisk(operation.create_disk())));
+
         if (error.isSome()) {
           drop(framework, operation, error->message);
           continue;
@@ -5565,34 +5298,6 @@ void Master::_accept(
       }
 
       case Offer::Operation::DESTROY_DISK: {
-        const Resource::DiskInfo::Source::Type diskType =
-          operation.destroy_disk().source().disk().source().type();
-
-        CHECK(!authorizations.empty());
-        Future<bool> authorization = authorizations.front();
-        authorizations.pop_front();
-
-        CHECK(!authorization.isDiscarded());
-
-        if (authorization.isFailed()) {
-          // TODO(greggomann): We may want to retry this failed authorization
-          // request rather than dropping it immediately.
-          drop(framework,
-               operation,
-               "Authorization of principal '" + framework->info.principal() +
-                 "' to destroy a " + stringify(diskType) + " disk failed: " +
-                 authorization.failure());
-
-          continue;
-        } else if (!authorization.get()) {
-          drop(framework,
-               operation,
-               "Not authorized to destroy a " + stringify(diskType) +
-                 " disk as '" + framework->info.principal() + "'");
-
-          continue;
-        }
-
         if (!slave->capabilities.resourceProvider) {
           drop(framework,
                operation,
@@ -5604,6 +5309,12 @@ void Master::_accept(
         Option<Error> error = validation::operation::validate(
             operation.destroy_disk());
 
+        error = error.isSome()
+          ? Error(error->message + "; on agent " + stringify(*slave))
+          : authorized(CHECK_NOTERROR(
+              ActionObject::destroyDisk(operation.destroy_disk())));
+
+
         if (error.isSome()) {
           drop(framework, operation, error->message);
           continue;
@@ -5638,11 +5349,6 @@ void Master::_accept(
     }
   }
 
-  CHECK(authorizations.empty())
-    << "Authorization results not processed: "
-    << stringify(
-           vector<Future<bool>>(authorizations.begin(), authorizations.end()));
-
   // Update the allocator based on the operations.
   if (!conversions.empty()) {
     allocator->updateAllocation(
diff --git a/src/master/master.hpp b/src/master/master.hpp
index f766b5c..7281815 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -956,9 +956,7 @@ private:
   void _accept(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
-      mesos::scheduler::Call::Accept&& accept,
-      const process::Future<
-          std::vector<process::Future<bool>>>& authorizations);
+      mesos::scheduler::Call::Accept&& accept);
 
   void acceptInverseOffers(
       Framework* framework,
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index 4074a18..2d8fa7c 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -390,628 +390,6 @@ TEST_F(MasterAuthorizationTest, UnauthorizedTaskGroup)
   driver.join();
 }
 
-
-// This test verifies that a 'killTask()' that comes before
-// '_accept()' is called results in TASK_KILLED.
-TEST_F(MasterAuthorizationTest, KillTask)
-{
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
-  // Return a pending future from authorizer.
-  Future<Nothing> authorize;
-  Promise<bool> promise;
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillOnce(DoAll(FutureSatisfy(&authorize),
-                    Return(promise.future())));
-
-  driver.launchTasks(offers.get()[0].id(), {task});
-
-  // Wait until authorization is in progress.
-  AWAIT_READY(authorize);
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  Future<Nothing> recoverResources =
-    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
-  // Now kill the task.
-  driver.killTask(task.task_id());
-
-  // Framework should get a TASK_KILLED right away.
-  AWAIT_READY(status);
-  EXPECT_EQ(TASK_KILLED, status->state());
-  EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, status->reason());
-
-  // Now complete authorization.
-  promise.set(true);
-
-  // No task launch should happen resulting in all resources being
-  // returned to the allocator.
-  AWAIT_READY(recoverResources);
-
-  // Make sure the task is not known to master anymore.
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .Times(0);
-
-  driver.reconcileTasks({});
-
-  // We settle the clock here to ensure any updates sent by the master
-  // are received. There shouldn't be any updates in this case.
-  Clock::pause();
-  Clock::settle();
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that if a pending task in a task group
-// is killed, then the entire group will be killed.
-TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup)
-{
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  Future<FrameworkID> frameworkId;
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(FutureArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(frameworkId);
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  Resources resources =
-    Resources::parse("cpus:0.1;mem:32;disk:32").get();
-
-  ExecutorInfo executor;
-  executor.set_type(ExecutorInfo::DEFAULT);
-  executor.mutable_executor_id()->set_value("E");
-  executor.mutable_framework_id()->CopyFrom(frameworkId.get());
-  executor.mutable_resources()->CopyFrom(resources);
-
-  TaskInfo task1;
-  task1.set_name("1");
-  task1.mutable_task_id()->set_value("1");
-  task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task1.mutable_resources()->MergeFrom(resources);
-
-  TaskInfo task2;
-  task2.set_name("2");
-  task2.mutable_task_id()->set_value("2");
-  task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
-  task2.mutable_resources()->MergeFrom(resources);
-
-  TaskGroupInfo taskGroup;
-  taskGroup.add_tasks()->CopyFrom(task1);
-  taskGroup.add_tasks()->CopyFrom(task2);
-
-  // Return a pending future from authorizer.
-  Future<Nothing> authorize1;
-  Future<Nothing> authorize2;
-  Promise<bool> promise1;
-  Promise<bool> promise2;
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillOnce(DoAll(FutureSatisfy(&authorize1),
-                    Return(promise1.future())))
-    .WillOnce(DoAll(FutureSatisfy(&authorize2),
-                    Return(promise2.future())));
-
-  Future<TaskStatus> task1Status;
-  Future<TaskStatus> task2Status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&task1Status))
-    .WillOnce(FutureArg<1>(&task2Status));
-
-  Offer::Operation operation;
-  operation.set_type(Offer::Operation::LAUNCH_GROUP);
-
-  Offer::Operation::LaunchGroup* launchGroup =
-    operation.mutable_launch_group();
-
-  launchGroup->mutable_executor()->CopyFrom(executor);
-  launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
-  driver.acceptOffers({offers.get()[0].id()}, {operation});
-
-  // Wait until all authorizations are in progress.
-  AWAIT_READY(authorize1);
-  AWAIT_READY(authorize2);
-
-  Future<Nothing> recoverResources =
-    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
-  // Now kill task1.
-  driver.killTask(task1.task_id());
-
-  AWAIT_READY(task1Status);
-  EXPECT_EQ(TASK_KILLED, task1Status->state());
-  EXPECT_TRUE(strings::contains(
-      task1Status->message(), "Killed before delivery to the agent"));
-  EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
-            task1Status->reason());
-
-  // Now complete authorizations for task1 and task2.
-  promise1.set(true);
-  promise2.set(true);
-
-  AWAIT_READY(task2Status);
-  EXPECT_EQ(TASK_KILLED, task2Status->state());
-  EXPECT_TRUE(strings::contains(
-      task2Status->message(),
-      "A task within the task group was killed before delivery to the agent"));
-  EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
-            task2Status->reason());
-
-  // No task launch should happen resulting in all resources being
-  // returned to the allocator.
-  AWAIT_READY(recoverResources);
-
-  // Make sure the task group is not known to master anymore.
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .Times(0);
-
-  driver.reconcileTasks({});
-
-  // We settle the clock here to ensure any updates sent by the master
-  // are received. There shouldn't be any updates in this case.
-  Clock::pause();
-  Clock::settle();
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that a slave removal that comes before
-// '_accept()' is called results in TASK_LOST for a framework that is
-// not partition-aware.
-TEST_F(MasterAuthorizationTest, SlaveRemovedLost)
-{
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
-  // Return a pending future from authorizer.
-  Future<Nothing> authorize;
-  Promise<bool> promise;
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillOnce(DoAll(FutureSatisfy(&authorize),
-                    Return(promise.future())));
-
-  driver.launchTasks(offers.get()[0].id(), {task});
-
-  // Wait until authorization is in progress.
-  AWAIT_READY(authorize);
-
-  Future<Nothing> slaveLost;
-  EXPECT_CALL(sched, slaveLost(&driver, _))
-    .WillOnce(FutureSatisfy(&slaveLost));
-
-  Future<Nothing> recoverResources =
-    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
-  // Stop the slave with explicit shutdown as otherwise with
-  // checkpointing the master will wait for the slave to reconnect.
-  slave.get()->shutdown();
-  slave->reset();
-
-  AWAIT_READY(slaveLost);
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  // Now complete authorization.
-  promise.set(true);
-
-  // Framework should get a TASK_LOST.
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_LOST, status->state());
-  EXPECT_EQ(TaskStatus::SOURCE_MASTER, status->source());
-  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status->reason());
-
-  // No task launch should happen resulting in all resources being
-  // returned to the allocator.
-  AWAIT_READY(recoverResources);
-
-  // Check metrics.
-  JSON::Object stats = Metrics();
-  EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
-  EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
-  EXPECT_EQ(
-      1u, stats.values["master/task_lost/source_master/reason_slave_removed"]);
-
-  // Make sure the task is not known to master anymore.
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .Times(0);
-
-  driver.reconcileTasks({});
-
-  // We settle the clock here to ensure any updates sent by the master
-  // are received. There shouldn't be any updates in this case.
-  Clock::pause();
-  Clock::settle();
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that a slave removal that comes before
-// '_accept()' is called results in TASK_DROPPED for a framework that
-// is partition-aware.
-TEST_F(MasterAuthorizationTest, SlaveRemovedDropped)
-{
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.add_capabilities()->set_type(
-      FrameworkInfo::Capability::PARTITION_AWARE);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
-  // Return a pending future from authorizer.
-  Future<Nothing> authorize;
-  Promise<bool> promise;
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillOnce(DoAll(FutureSatisfy(&authorize),
-                    Return(promise.future())));
-
-  driver.launchTasks(offers.get()[0].id(), {task});
-
-  // Wait until authorization is in progress.
-  AWAIT_READY(authorize);
-
-  Future<Nothing> slaveLost;
-  EXPECT_CALL(sched, slaveLost(&driver, _))
-    .WillOnce(FutureSatisfy(&slaveLost));
-
-  Future<Nothing> recoverResources =
-    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
-  // Stop the slave with explicit shutdown as otherwise with
-  // checkpointing the master will wait for the slave to reconnect.
-  slave.get()->shutdown();
-  slave->reset();
-
-  AWAIT_READY(slaveLost);
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  // Now complete authorization.
-  promise.set(true);
-
-  // Framework should get a TASK_DROPPED.
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_DROPPED, status->state());
-  EXPECT_EQ(TaskStatus::SOURCE_MASTER, status->source());
-  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status->reason());
-
-  // No task launch should happen resulting in all resources being
-  // returned to the allocator.
-  AWAIT_READY(recoverResources);
-
-  // Check metrics.
-  JSON::Object stats = Metrics();
-  EXPECT_EQ(0u, stats.values["master/tasks_lost"]);
-  EXPECT_EQ(1u, stats.values["master/tasks_dropped"]);
-  EXPECT_EQ(
-      1u,
-      stats.values["master/task_dropped/source_master/reason_slave_removed"]);
-
-  // Make sure the task is not known to master anymore.
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .Times(0);
-
-  driver.reconcileTasks({});
-
-  // We settle the clock here to ensure any updates sent by the master
-  // are received. There shouldn't be any updates in this case.
-  Clock::pause();
-  Clock::settle();
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that a framework removal that comes before
-// '_accept()' is called results in recovery of resources.
-TEST_F(MasterAuthorizationTest, FrameworkRemoved)
-{
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
-  // Return a pending future from authorizer.
-  Future<Nothing> authorize;
-  Promise<bool> promise;
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillOnce(DoAll(FutureSatisfy(&authorize),
-                    Return(promise.future())));
-
-  driver.launchTasks(offers.get()[0].id(), {task});
-
-  // Wait until authorization is in progress.
-  AWAIT_READY(authorize);
-
-  Future<Nothing> removeFramework =
-    FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework);
-
-  Future<Nothing> recoverResources =
-    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
-  // Now stop the framework.
-  driver.stop();
-  driver.join();
-
-  AWAIT_READY(removeFramework);
-
-  // Now complete authorization.
-  promise.set(true);
-
-  // No task launch should happen resulting in all resources being
-  // returned to the allocator.
-  AWAIT_READY(recoverResources);
-}
-
-
-// This test verifies that two tasks each launched on a different
-// slave with same executor id but different executor info are
-// allowed even when the first task is pending due to authorization.
-TEST_F(MasterAuthorizationTest, PendingExecutorInfoDiffersOnDifferentSlaves)
-{
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  Future<Nothing> registered;
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(FutureSatisfy(&registered));
-
-  driver.start();
-
-  AWAIT_READY(registered);
-
-  Future<vector<Offer>> offers1;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers1));
-
-  // Start the first slave.
-  MockExecutor exec1(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer1(&exec1);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave1 =
-    StartSlave(detector.get(), &containerizer1);
-  ASSERT_SOME(slave1);
-
-  AWAIT_READY(offers1);
-  ASSERT_FALSE(offers1->empty());
-
-  // Launch the first task with the default executor id.
-  ExecutorInfo executor1;
-  executor1 = DEFAULT_EXECUTOR_INFO;
-  executor1.mutable_command()->set_value("exit 1");
-
-  TaskInfo task1 = createTask(
-      offers1.get()[0], executor1.command().value(), executor1.executor_id());
-
-  // Return a pending future from authorizer.
-  // Note that we retire this expectation after its use because
-  // the authorizer will next be called when `slave2` registers and
-  // this expectation would be hit again (and be oversaturated) if
-  // we don't retire. New expectations on `authorizer` will be set
-  // after `slave2` is registered.
-  Future<Nothing> authorize;
-  Promise<bool> promise;
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillOnce(DoAll(FutureSatisfy(&authorize),
-                    Return(promise.future())))
-    .RetiresOnSaturation();
-
-  driver.launchTasks(offers1.get()[0].id(), {task1});
-
-  // Wait until authorization is in progress.
-  AWAIT_READY(authorize);
-
-  Future<vector<Offer>> offers2;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers2))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  // Now start the second slave.
-  MockExecutor exec2(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer2(&exec2);
-
-  Try<Owned<cluster::Slave>> slave2 =
-    StartSlave(detector.get(), &containerizer2);
-  ASSERT_SOME(slave2);
-
-  AWAIT_READY(offers2);
-  ASSERT_FALSE(offers2->empty());
-
-  // Now launch the second task with the same executor id but
-  // a different executor command.
-  ExecutorInfo executor2;
-  executor2 = executor1;
-  executor2.mutable_command()->set_value("exit 2");
-
-  TaskInfo task2 = createTask(
-      offers2.get()[0], executor2.command().value(), executor2.executor_id());
-
-  EXPECT_CALL(exec2, registered(_, _, _, _));
-
-  EXPECT_CALL(exec2, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  Future<TaskStatus> status2;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status2));
-
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillOnce(Return(true));
-
-  driver.launchTasks(offers2.get()[0].id(), {task2});
-
-  AWAIT_READY(status2);
-  ASSERT_EQ(TASK_RUNNING, status2->state());
-
-  EXPECT_CALL(exec1, registered(_, _, _, _));
-
-  EXPECT_CALL(exec1, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  Future<TaskStatus> status1;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status1));
-
-  // Complete authorization of 'task1'.
-  promise.set(true);
-
-  AWAIT_READY(status1);
-  ASSERT_EQ(TASK_RUNNING, status1->state());
-
-  EXPECT_CALL(exec1, shutdown(_))
-    .Times(AtMost(1));
-
-  EXPECT_CALL(exec2, shutdown(_))
-    .Times(AtMost(1));
-
-  driver.stop();
-  driver.join();
-}
-
-
 // This test verifies that a framework registration with authorized
 // role is successful.
 TEST_F(MasterAuthorizationTest, AuthorizedRole)
@@ -3206,6 +2584,23 @@ public:
 };
 
 
+class ControllableObjectApprover : public ObjectApprover
+{
+public:
+  ControllableObjectApprover(bool permissive_) : permissive(permissive_) {}
+  void disable() { permissive.store(false); }
+
+  Try<bool> approved(
+      const Option<ObjectApprover::Object>&) const noexcept override
+  {
+    return permissive.load();
+  }
+
+private:
+  std::atomic_bool permissive;
+};
+
+
 INSTANTIATE_TEST_CASE_P(
     AllowedAction,
     MasterOperationAuthorizationTest,
@@ -3232,14 +2627,18 @@ TEST_P(MasterOperationAuthorizationTest, Accept)
 {
   Clock::pause();
 
-  // We use this flag to control when the mock authorizer starts to deny
-  // disallowed actions.
-  std::atomic_bool permissive(true);
+  const auto controllableApprover =
+    std::make_shared<ControllableObjectApprover>(true);
 
   MockAuthorizer authorizer;
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillRepeatedly(Invoke([&](const authorization::Request& request) {
-      return permissive || request.action() == GetParam();
+
+  const authorization::Action allowedAction = GetParam();
+  EXPECT_CALL(authorizer, getApprover(_, _))
+    .WillRepeatedly(Invoke([controllableApprover, allowedAction](
+                               const Option<authorization::Subject>&,
+                               const authorization::Action& action) {
+      return action == allowedAction ? getAcceptingObjectApprover()
+                                     : controllableApprover;
     }));
 
   Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
@@ -3493,7 +2892,7 @@ TEST_P(MasterOperationAuthorizationTest, Accept)
   EXPECT_EQ(frameworkId, subscribed->framework_id());
 
   // Start to deny disallowed actions.
-  permissive = false;
+  controllableApprover->disable();
 
   AWAIT_READY(offers);
   ASSERT_EQ(1, offers->offers_size());
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 9688f5f..c47d4c3 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -10572,109 +10572,6 @@ TEST_P(MasterTestPrePostReservationRefinement,
 }
 
 
-// This test verifies that hitting the `/state` endpoint before '_accept()'
-// is called results in pending tasks being reported correctly.
-TEST_P(MasterTestPrePostReservationRefinement, StateEndpointPendingTasks)
-{
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
-
-  // TODO(mpark): Remove this once `RESERVATION_REFINEMENT`
-  // is removed from `DEFAULT_FRAMEWORK_INFO`.
-  frameworkInfo.clear_capabilities();
-  frameworkInfo.add_capabilities()->set_type(
-      FrameworkInfo::Capability::MULTI_ROLE);
-
-  if (GetParam()) {
-    frameworkInfo.add_capabilities()->set_type(
-        FrameworkInfo::Capability::RESERVATION_REFINEMENT);
-  }
-
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  Offer offer = offers->front();
-
-  TaskInfo task;
-  task.set_name("");
-  task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offer.slave_id());
-  task.mutable_resources()->MergeFrom(offer.resources());
-  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
-  // Return a pending future from authorizer.
-  Future<Nothing> authorize;
-  Promise<bool> promise;
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillOnce(DoAll(FutureSatisfy(&authorize),
-                    Return(promise.future())));
-
-  driver.launchTasks(offer.id(), {task});
-
-  // Wait until authorization is in progress.
-  AWAIT_READY(authorize);
-
-  Future<Response> response = process::http::get(
-      master.get()->pid,
-      "state",
-      None(),
-      createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-
-  Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
-  ASSERT_SOME(parse);
-
-  JSON::Value result = parse.get();
-
-  JSON::Object expected = {
-    {
-      "frameworks",
-      JSON::Array {
-        JSON::Object {
-          {
-            "tasks",
-            JSON::Array {
-              JSON::Object {
-                { "id", "1" },
-                { "role", frameworkInfo.roles(0) },
-                { "state", "TASK_STAGING" }
-              }
-            }
-          }
-        }
-      }
-    }
-  };
-
-  EXPECT_TRUE(result.contains(expected));
-
-  driver.stop();
-  driver.join();
-}
-
-
 // This test verifies that an operator can reserve and unreserve
 // resources through the master operator API in both
 // "(pre|post)-reservation-refinement" formats.
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index cdff370..4c84b1a 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -846,92 +846,6 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask)
 }
 
 
-// This test ensures that reconciliation requests for tasks that are
-// pending are exposed in reconciliation.
-TEST_F(ReconciliationTest, PendingTask)
-{
-  MockAuthorizer authorizer;
-  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
-  ASSERT_SOME(slave);
-
-  // Wait for the slave to register and get the slave id.
-  AWAIT_READY(slaveRegisteredMessage);
-  const SlaveID slaveId = slaveRegisteredMessage->slave_id();
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  // Return a pending future from authorizer.
-  Future<Nothing> authorize;
-  Promise<bool> promise;
-  EXPECT_CALL(authorizer, authorized(_))
-    .WillOnce(DoAll(FutureSatisfy(&authorize),
-                    Return(promise.future())));
-
-  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
-  driver.launchTasks(offers.get()[0].id(), {task});
-
-  // Wait until authorization is in progress.
-  AWAIT_READY(authorize);
-
-  // First send an implicit reconciliation request for this task.
-  Future<TaskStatus> update;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&update));
-
-  driver.reconcileTasks({});
-
-  AWAIT_READY(update);
-  EXPECT_EQ(TASK_STAGING, update->state());
-  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update->reason());
-  EXPECT_TRUE(update->has_slave_id());
-
-  // Now send an explicit reconciliation request for this task.
-  Future<TaskStatus> update2;
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&update2));
-
-  TaskStatus status;
-  status.mutable_task_id()->CopyFrom(task.task_id());
-  status.mutable_slave_id()->CopyFrom(slaveId);
-  status.set_state(TASK_STAGING); // Dummy value.
-
-  driver.reconcileTasks({status});
-
-  AWAIT_READY(update2);
-  EXPECT_EQ(TASK_STAGING, update2->state());
-  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update2->reason());
-  EXPECT_TRUE(update2->has_slave_id());
-
-  driver.stop();
-  driver.join();
-}
-
-
 // This test ensures that the master responds with the latest state
 // for tasks that are terminal at the master, but have not been
 // acknowledged by the framework. See MESOS-1389.