You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by as...@apache.org on 2020/03/03 07:52:53 UTC
[mesos] 06/08: Converted ACCEPT to synchronous authorization.
This is an automated email from the ASF dual-hosted git repository.
asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 495db0e983222a5038a26b93e0ca689f2ce009a0
Author: Andrei Sekretenko <as...@mesosphere.com>
AuthorDate: Wed Jan 29 15:24:32 2020 +0100
Converted ACCEPT to synchronous authorization.
This patch converts ACCEPT call to synchronous authorization
(see MESOS-10056), thus fixing race between ACCEPT and REVIVE
(MESOS-10023) and removing potential for other similar races.
It also moves authorization of scheduler API operations after their
validation (thus fixing MESOS-10083) and effectively gets rid of the
concept of a "task pending authorization".
Tests are converted from mocking `Authorizer::authorized(...)`
to mocking `Authorizer::provideObjectApprover(...)` as necessary.
Review: https://reviews.apache.org/r/72098
---
src/master/authorization.cpp | 16 +-
src/master/framework.cpp | 16 +-
src/master/master.cpp | 478 +++++-----------------
src/master/master.hpp | 4 +-
src/tests/master_authorization_tests.cpp | 657 ++-----------------------------
src/tests/master_tests.cpp | 103 -----
src/tests/reconciliation_tests.cpp | 86 ----
7 files changed, 149 insertions(+), 1211 deletions(-)
diff --git a/src/master/authorization.cpp b/src/master/authorization.cpp
index 77719eb..6dfa59a 100644
--- a/src/master/authorization.cpp
+++ b/src/master/authorization.cpp
@@ -387,10 +387,20 @@ ostream& operator<<(ostream& stream, const ActionObject& actionObject)
}
switch (actionObject.action()) {
- case authorization::RUN_TASK:
+ case authorization::RUN_TASK: {
+ const TaskInfo& task = object->task_info();
+ const FrameworkInfo& framework = object->framework_info();
return stream
- << "launch task " << object->task_info().task_id()
- << " of framework " << object->framework_info().id();
+ << "launch task " << task.task_id()
+ << " of framework " << framework.id()
+ << " under user '"
+ << (task.has_command() && task.command().has_user()
+ ? task.command().user()
+ : task.has_executor() && task.executor().command().has_user()
+ ? task.executor().command().user()
+ : framework.user())
+ << "'";
+ }
case authorization::REGISTER_FRAMEWORK:
return stream
diff --git a/src/master/framework.cpp b/src/master/framework.cpp
index ffcf367..7e46469 100644
--- a/src/master/framework.cpp
+++ b/src/master/framework.cpp
@@ -740,7 +740,21 @@ Try<bool> Framework::approved(const ActionObject& actionObject) const
constexpr std::initializer_list<authorization::Action> SCHEDULER_API_ACTIONS{
- authorization::REGISTER_FRAMEWORK};
+ authorization::REGISTER_FRAMEWORK,
+ authorization::RUN_TASK,
+
+ authorization::UNRESERVE_RESOURCES,
+ authorization::RESERVE_RESOURCES,
+
+ authorization::CREATE_VOLUME,
+ authorization::DESTROY_VOLUME,
+ authorization::RESIZE_VOLUME,
+
+ authorization::CREATE_MOUNT_DISK,
+ authorization::CREATE_BLOCK_DISK,
+ authorization::DESTROY_MOUNT_DISK,
+ authorization::DESTROY_BLOCK_DISK,
+ authorization::DESTROY_RAW_DISK};
Future<Owned<ObjectApprovers>> Framework::createObjectApprovers(
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 07685fe..b09ce8e 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3721,8 +3721,6 @@ Future<bool> Master::authorize(
*request.mutable_object() = *(std::move(actionObject).object());
}
- // TODO(asekretenko): Use a background-refreshed ObjectApprover
- // when they become available (see MESOS-10056).
return authorizer.get()->authorized(request);
}
@@ -4317,126 +4315,18 @@ void Master::accept(
}
}
- const Option<Principal> principal = framework->info.has_principal()
- ? Principal(framework->info.principal())
- : Option<Principal>::none();
-
- // TODO(asekretenko): Use background-refreshed ObjectApprovers
- // instead of asynchronous authorization.
- vector<Future<bool>> futures;
- for (const Offer::Operation& operation : accept.operations()) {
- switch (operation.type()) {
- case Offer::Operation::LAUNCH:
- case Offer::Operation::LAUNCH_GROUP: {
- for (const TaskInfo& task : getOperationTasks(operation)) {
- futures.emplace_back(authorize(
- principal, ActionObject::taskLaunch(task, framework->info)));
- }
- break;
- }
-
- // NOTE: When handling RESERVE and UNRESERVE operations, authorization
- // will proceed even if no principal is specified, although currently
- // resources cannot be reserved or unreserved unless a principal is
- // provided. Any RESERVE/UNRESERVE operation with no associated principal
- // will be found invalid when `validate()` is called in `_accept()` below.
-
- // The RESERVE operation allows a principal to reserve resources.
- case Offer::Operation::RESERVE: {
- futures.push_back(authorize(
- principal, ActionObject::reserve(operation.reserve())));
-
- break;
- }
-
- // The UNRESERVE operation allows a principal to unreserve resources.
- case Offer::Operation::UNRESERVE: {
- futures.push_back(authorize(
- principal, ActionObject::unreserve(operation.unreserve())));
-
- break;
- }
-
- // The CREATE operation allows the creation of a persistent volume.
- case Offer::Operation::CREATE: {
- futures.push_back(authorize(
- principal, ActionObject::createVolume(operation.create())));
-
- break;
- }
-
- // The DESTROY operation allows the destruction of a persistent volume.
- case Offer::Operation::DESTROY: {
- futures.push_back(authorize(
- principal, ActionObject::destroyVolume(operation.destroy())));
-
- break;
- }
-
- case Offer::Operation::GROW_VOLUME: {
- futures.push_back(authorize(
- principal, ActionObject::growVolume(operation.grow_volume())));
-
- break;
- }
-
- case Offer::Operation::SHRINK_VOLUME: {
- futures.push_back(authorize(
- principal, ActionObject::shrinkVolume(operation.shrink_volume())));
-
- break;
- }
-
- case Offer::Operation::CREATE_DISK: {
- Try<ActionObject> actionObject =
- ActionObject::createDisk(operation.create_disk());
-
- if (actionObject.isError()) {
- futures.push_back(Failure(actionObject.error()));
- } else {
- futures.push_back(authorize(principal, std::move(*actionObject)));
- }
-
- break;
- }
-
- case Offer::Operation::DESTROY_DISK: {
- Try<ActionObject> actionObject =
- ActionObject::destroyDisk(operation.destroy_disk());
-
- if (actionObject.isError()) {
- futures.push_back(Failure(actionObject.error()));
- } else {
- futures.push_back(authorize(principal, std::move(*actionObject)));
- }
-
- break;
- }
-
- case Offer::Operation::UNKNOWN: {
- // TODO(vinod): Send an error event to the scheduler?
- LOG(WARNING) << "Ignoring unknown operation";
- break;
- }
- }
- }
-
- // Wait for all the tasks to be authorized.
- await(futures)
- .onAny(defer(self(),
- &Master::_accept,
- framework->id(),
- slaveId,
- std::move(accept),
- lambda::_1));
+ // TODO(asekretenko): Dismantle `_accept(...)` (which, before synchronous
+ // authorization was introduced, used to be a deferred continuation of ACCEPT
+ // call processing, but now is kept only for limiting variable scopes) and
+ // handle operations one-by-one.
+ _accept(framework->id(), slaveId, std::move(accept));
}
void Master::_accept(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- scheduler::Call::Accept&& accept,
- const Future<vector<Future<bool>>>& _authorizations)
+ scheduler::Call::Accept&& accept)
{
auto discardOffers = [this](const RepeatedPtrField<OfferID>& ids) {
for (const OfferID& offerId : ids) {
@@ -4583,56 +4473,60 @@ void Master::_accept(
// The order of the conversions is important and preserved.
vector<ResourceConversion> conversions;
- // The order of `authorizations` must match the order of the operations and/or
- // tasks in `accept.operations()` as they are iterated through simultaneously.
- CHECK_READY(_authorizations);
- std::deque<Future<bool>> authorizations(
- _authorizations->begin(), _authorizations->end());
-
foreach (const Offer::Operation& operation, accept.operations()) {
- switch (operation.type()) {
- // The RESERVE operation allows a principal to reserve resources.
- case Offer::Operation::RESERVE: {
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
+ auto authorized_ =
+ [&framework, &operation](const ActionObject& actionObject)
+ -> Option<Error> {
+ const Try<bool> authorized = framework->approved(actionObject);
+ if (authorized.isError()) {
+ return Error(
+ "Failed to authorize principal '" + framework->info.principal() +
+ "' to perform " + Offer::Operation::Type_Name(operation.type()) +
+ ": " + authorized.error());
+ }
- CHECK(!authorization.isDiscarded());
+ if (!*authorized) {
+ return Error(
+ "Principal '" + framework->info.principal() +
+ "' no authorized to " + stringify(actionObject));
+ }
- if (authorization.isFailed()) {
- // TODO(greggomann): We may want to retry this failed authorization
- // request rather than dropping it immediately.
- drop(framework,
- operation,
- "Authorization of principal '" + framework->info.principal() +
- "' to reserve resources failed: " + authorization.failure());
+ return None();
+ };
- continue;
- } else if (!authorization.get()) {
- drop(framework,
- operation,
- "Not authorized to reserve resources as '" +
- framework->info.principal() + "'");
+ auto authorized = overload(
+ authorized_,
+ [&authorized_](const vector<ActionObject>& actionObjects) {
+ for (const ActionObject& actionObject : actionObjects) {
+ const Option<Error> error = authorized_(actionObject);
+ if (error.isSome()) {
+ return error;
+ }
+ }
+
+ return Option<Error>::none();
+ });
- continue;
- }
+ switch (operation.type()) {
+ // The RESERVE operation allows a principal to reserve resources.
+ case Offer::Operation::RESERVE: {
Option<Principal> principal = framework->info.has_principal()
? Principal(framework->info.principal())
: Option<Principal>::none();
- // Make sure this reserve operation is valid.
Option<Error> error = validation::operation::validate(
operation.reserve(),
principal,
slave->capabilities,
framework->info);
+ error = error.isSome()
+ ? Error(error->message + "; on agent " + stringify(*slave))
+ : authorized(ActionObject::reserve(operation.reserve()));
+
if (error.isSome()) {
- drop(
- framework,
- operation,
- error->message + "; on agent " + stringify(*slave));
+ drop(framework, operation, error->message);
continue;
}
@@ -4669,35 +4563,13 @@ void Master::_accept(
// The UNRESERVE operation allows a principal to unreserve resources.
case Offer::Operation::UNRESERVE: {
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
-
- CHECK(!authorization.isDiscarded());
-
- if (authorization.isFailed()) {
- // TODO(greggomann): We may want to retry this failed authorization
- // request rather than dropping it immediately.
- drop(framework,
- operation,
- "Authorization of principal '" + framework->info.principal() +
- "' to unreserve resources failed: " +
- authorization.failure());
-
- continue;
- } else if (!authorization.get()) {
- drop(framework,
- operation,
- "Not authorized to unreserve resources as '" +
- framework->info.principal() + "'");
-
- continue;
- }
-
- // Make sure this unreserve operation is valid.
Option<Error> error =
validation::operation::validate(operation.unreserve());
+ error = error.isSome()
+ ? Error(error->message + "; on agent " + stringify(*slave))
+ : authorized(ActionObject::unreserve(operation.unreserve()));
+
if (error.isSome()) {
drop(framework, operation, error->message);
continue;
@@ -4735,31 +4607,6 @@ void Master::_accept(
}
case Offer::Operation::CREATE: {
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
-
- CHECK(!authorization.isDiscarded());
-
- if (authorization.isFailed()) {
- // TODO(greggomann): We may want to retry this failed authorization
- // request rather than dropping it immediately.
- drop(framework,
- operation,
- "Authorization of principal '" + framework->info.principal() +
- "' to create persistent volumes failed: " +
- authorization.failure());
-
- continue;
- } else if (!authorization.get()) {
- drop(framework,
- operation,
- "Not authorized to create persistent volumes as '" +
- framework->info.principal() + "'");
-
- continue;
- }
-
Option<Principal> principal = framework->info.has_principal()
? Principal(framework->info.principal())
: Option<Principal>::none();
@@ -4772,11 +4619,12 @@ void Master::_accept(
slave->capabilities,
framework->info);
+ error = error.isSome()
+ ? Error(error->message + "; on agent " + stringify(*slave))
+ : authorized(ActionObject::createVolume(operation.create()));
+
if (error.isSome()) {
- drop(
- framework,
- operation,
- error->message + "; on agent " + stringify(*slave));
+ drop(framework, operation, error->message);
continue;
}
@@ -4813,38 +4661,16 @@ void Master::_accept(
}
case Offer::Operation::DESTROY: {
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
-
- CHECK(!authorization.isDiscarded());
-
- if (authorization.isFailed()) {
- // TODO(greggomann): We may want to retry this failed authorization
- // request rather than dropping it immediately.
- drop(framework,
- operation,
- "Authorization of principal '" + framework->info.principal() +
- "' to destroy persistent volumes failed: " +
- authorization.failure());
-
- continue;
- } else if (!authorization.get()) {
- drop(framework,
- operation,
- "Not authorized to destroy persistent volumes as '" +
- framework->info.principal() + "'");
-
- continue;
- }
-
- // Make sure this destroy operation is valid.
Option<Error> error = validation::operation::validate(
operation.destroy(),
slave->checkpointedResources,
slave->usedResources,
slave->pendingTasks);
+ error = error.isSome()
+ ? Error(error->message + "; on agent " + stringify(*slave))
+ : authorized(ActionObject::destroyVolume(operation.destroy()));
+
if (error.isSome()) {
drop(framework, operation, error->message);
continue;
@@ -4901,40 +4727,15 @@ void Master::_accept(
}
case Offer::Operation::GROW_VOLUME: {
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
-
- CHECK(!authorization.isDiscarded());
-
- if (authorization.isFailed()) {
- // TODO(greggomann): We may want to retry this failed authorization
- // request rather than dropping it immediately.
- drop(framework,
- operation,
- "Authorization of principal '" + framework->info.principal() +
- "' to grow a volume failed: " +
- authorization.failure());
-
- continue;
- } else if (!authorization.get()) {
- drop(framework,
- operation,
- "Not authorized to grow a volume as '" +
- framework->info.principal() + "'");
-
- continue;
- }
-
- // Make sure this grow volume operation is valid.
Option<Error> error = validation::operation::validate(
operation.grow_volume(), slave->capabilities);
+ error = error.isSome() ?
+ Error(error->message + "; on agent " + stringify(*slave))
+ : authorized(ActionObject::growVolume(operation.grow_volume()));
+
if (error.isSome()) {
- drop(
- framework,
- operation,
- error->message + "; on agent " + stringify(*slave));
+ drop(framework, operation, error->message);
continue;
}
@@ -4984,40 +4785,15 @@ void Master::_accept(
}
case Offer::Operation::SHRINK_VOLUME: {
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
-
- CHECK(!authorization.isDiscarded());
-
- if (authorization.isFailed()) {
- // TODO(greggomann): We may want to retry this failed authorization
- // request rather than dropping it immediately.
- drop(framework,
- operation,
- "Authorization of principal '" + framework->info.principal() +
- "' to shrink a volume failed: " +
- authorization.failure());
-
- continue;
- } else if (!authorization.get()) {
- drop(framework,
- operation,
- "Not authorized to shrink a volume as '" +
- framework->info.principal() + "'");
-
- continue;
- }
-
- // Make sure this shrink volume operation is valid.
Option<Error> error = validation::operation::validate(
operation.shrink_volume(), slave->capabilities);
+ error = error.isSome()
+ ? Error(error->message + "; on agent " + stringify(*slave))
+ : authorized(ActionObject::shrinkVolume(operation.shrink_volume()));
+
if (error.isSome()) {
- drop(
- framework,
- operation,
- error->message + "; on agent " + stringify(*slave));
+ drop(framework, operation, error->message);
continue;
}
@@ -5068,10 +4844,6 @@ void Master::_accept(
case Offer::Operation::LAUNCH: {
foreach (const TaskInfo& task, operation.launch().task_infos()) {
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
-
// The task will not be in `pendingTasks` if it has been
// killed in the interim. No need to send TASK_KILLED in
// this case as it has already been sent. Note however that
@@ -5084,7 +4856,10 @@ void Master::_accept(
// TODO(bmahler): We may send TASK_ERROR after a TASK_KILLED
// if a task was killed (removed from `pendingTasks`) *and*
// the task is invalid or unauthorized here.
-
+ //
+ // TODO(asekretenko): Now that ACCEPT is authorized synchronously,
+ // master state cannot change while the task is being authorized,
+ // and all the code for tracking pending tasks can be removed.
bool pending = framework->pendingTasks.contains(task.task_id());
framework->pendingTasks.erase(task.task_id());
slave->pendingTasks[framework->id()].erase(task.task_id());
@@ -5092,17 +4867,10 @@ void Master::_accept(
slave->pendingTasks.erase(framework->id());
}
- CHECK(!authorization.isDiscarded());
-
- if (authorization.isFailed() || !authorization.get()) {
- string user = framework->info.user(); // Default user.
- if (task.has_command() && task.command().has_user()) {
- user = task.command().user();
- } else if (task.has_executor() &&
- task.executor().command().has_user()) {
- user = task.executor().command().user();
- }
+ const Option<Error> authorizationError =
+ authorized(ActionObject::taskLaunch(task, framework->info));
+ if (authorizationError.isSome()) {
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id(),
task.slave_id(),
@@ -5110,9 +4878,7 @@ void Master::_accept(
TASK_ERROR,
TaskStatus::SOURCE_MASTER,
None(),
- authorization.isFailed() ?
- "Authorization failure: " + authorization.failure() :
- "Not authorized to launch as user '" + user + "'",
+ authorizationError->message,
TaskStatus::REASON_TASK_UNAUTHORIZED);
metrics->tasks_error++;
@@ -5315,29 +5081,19 @@ void Master::_accept(
Option<Error> error;
Option<TaskStatus::Reason> reason;
- // NOTE: We check for the authorization errors first and never break the
- // loop to ensure that all authorization futures for this task group are
- // iterated through.
foreach (const TaskInfo& task, taskGroup.tasks()) {
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
+ const ActionObject actionObject =
+ ActionObject::taskLaunch(task, framework->info);
- CHECK(!authorization.isDiscarded());
+ const Try<bool> approval = framework->approved(actionObject);
- if (authorization.isFailed()) {
+ if (approval.isError()) {
error = Error("Failed to authorize task"
" '" + stringify(task.task_id()) + "'"
- ": " + authorization.failure());
- } else if (!authorization.get()) {
- string user = framework->info.user(); // Default user.
- if (task.has_command() && task.command().has_user()) {
- user = task.command().user();
- }
-
+ ": " + approval.error());
+ } else if (!*approval) {
error = Error("Task '" + stringify(task.task_id()) + "'"
- " is not authorized to launch as"
- " user '" + user + "'");
+ " is not authorized to" + stringify(actionObject));
}
}
@@ -5498,34 +5254,6 @@ void Master::_accept(
}
case Offer::Operation::CREATE_DISK: {
- const Resource::DiskInfo::Source::Type diskType =
- operation.create_disk().target_type();
-
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
-
- CHECK(!authorization.isDiscarded());
-
- if (authorization.isFailed()) {
- // TODO(greggomann): We may want to retry this failed authorization
- // request rather than dropping it immediately.
- drop(framework,
- operation,
- "Authorization of principal '" + framework->info.principal() +
- "' to create a " + stringify(diskType) + " disk failed: " +
- authorization.failure());
-
- continue;
- } else if (!authorization.get()) {
- drop(framework,
- operation,
- "Not authorized to create a " + stringify(diskType) +
- " disk as '" + framework->info.principal() + "'");
-
- continue;
- }
-
if (!slave->capabilities.resourceProvider) {
drop(framework,
operation,
@@ -5537,6 +5265,11 @@ void Master::_accept(
Option<Error> error = validation::operation::validate(
operation.create_disk());
+ error = error.isSome()
+ ? Error(error->message + "; on agent " + stringify(*slave))
+ : authorized(CHECK_NOTERROR(
+ ActionObject::createDisk(operation.create_disk())));
+
if (error.isSome()) {
drop(framework, operation, error->message);
continue;
@@ -5565,34 +5298,6 @@ void Master::_accept(
}
case Offer::Operation::DESTROY_DISK: {
- const Resource::DiskInfo::Source::Type diskType =
- operation.destroy_disk().source().disk().source().type();
-
- CHECK(!authorizations.empty());
- Future<bool> authorization = authorizations.front();
- authorizations.pop_front();
-
- CHECK(!authorization.isDiscarded());
-
- if (authorization.isFailed()) {
- // TODO(greggomann): We may want to retry this failed authorization
- // request rather than dropping it immediately.
- drop(framework,
- operation,
- "Authorization of principal '" + framework->info.principal() +
- "' to destroy a " + stringify(diskType) + " disk failed: " +
- authorization.failure());
-
- continue;
- } else if (!authorization.get()) {
- drop(framework,
- operation,
- "Not authorized to destroy a " + stringify(diskType) +
- " disk as '" + framework->info.principal() + "'");
-
- continue;
- }
-
if (!slave->capabilities.resourceProvider) {
drop(framework,
operation,
@@ -5604,6 +5309,12 @@ void Master::_accept(
Option<Error> error = validation::operation::validate(
operation.destroy_disk());
+ error = error.isSome()
+ ? Error(error->message + "; on agent " + stringify(*slave))
+ : authorized(CHECK_NOTERROR(
+ ActionObject::destroyDisk(operation.destroy_disk())));
+
+
if (error.isSome()) {
drop(framework, operation, error->message);
continue;
@@ -5638,11 +5349,6 @@ void Master::_accept(
}
}
- CHECK(authorizations.empty())
- << "Authorization results not processed: "
- << stringify(
- vector<Future<bool>>(authorizations.begin(), authorizations.end()));
-
// Update the allocator based on the operations.
if (!conversions.empty()) {
allocator->updateAllocation(
diff --git a/src/master/master.hpp b/src/master/master.hpp
index f766b5c..7281815 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -956,9 +956,7 @@ private:
void _accept(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- mesos::scheduler::Call::Accept&& accept,
- const process::Future<
- std::vector<process::Future<bool>>>& authorizations);
+ mesos::scheduler::Call::Accept&& accept);
void acceptInverseOffers(
Framework* framework,
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index 4074a18..2d8fa7c 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -390,628 +390,6 @@ TEST_F(MasterAuthorizationTest, UnauthorizedTaskGroup)
driver.join();
}
-
-// This test verifies that a 'killTask()' that comes before
-// '_accept()' is called results in TASK_KILLED.
-TEST_F(MasterAuthorizationTest, KillTask)
-{
- MockAuthorizer authorizer;
- Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer(&exec);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer>> offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->empty());
-
- TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
- // Return a pending future from authorizer.
- Future<Nothing> authorize;
- Promise<bool> promise;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize),
- Return(promise.future())));
-
- driver.launchTasks(offers.get()[0].id(), {task});
-
- // Wait until authorization is in progress.
- AWAIT_READY(authorize);
-
- Future<TaskStatus> status;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
-
- Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
- // Now kill the task.
- driver.killTask(task.task_id());
-
- // Framework should get a TASK_KILLED right away.
- AWAIT_READY(status);
- EXPECT_EQ(TASK_KILLED, status->state());
- EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, status->reason());
-
- // Now complete authorization.
- promise.set(true);
-
- // No task launch should happen resulting in all resources being
- // returned to the allocator.
- AWAIT_READY(recoverResources);
-
- // Make sure the task is not known to master anymore.
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .Times(0);
-
- driver.reconcileTasks({});
-
- // We settle the clock here to ensure any updates sent by the master
- // are received. There shouldn't be any updates in this case.
- Clock::pause();
- Clock::settle();
-
- driver.stop();
- driver.join();
-}
-
-
-// This test verifies that if a pending task in a task group
-// is killed, then the entire group will be killed.
-TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup)
-{
- MockAuthorizer authorizer;
- Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
- Future<FrameworkID> frameworkId;
- EXPECT_CALL(sched, registered(&driver, _, _))
- .WillOnce(FutureArg<1>(&frameworkId));
-
- Future<vector<Offer>> offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(frameworkId);
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->empty());
-
- Resources resources =
- Resources::parse("cpus:0.1;mem:32;disk:32").get();
-
- ExecutorInfo executor;
- executor.set_type(ExecutorInfo::DEFAULT);
- executor.mutable_executor_id()->set_value("E");
- executor.mutable_framework_id()->CopyFrom(frameworkId.get());
- executor.mutable_resources()->CopyFrom(resources);
-
- TaskInfo task1;
- task1.set_name("1");
- task1.mutable_task_id()->set_value("1");
- task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
- task1.mutable_resources()->MergeFrom(resources);
-
- TaskInfo task2;
- task2.set_name("2");
- task2.mutable_task_id()->set_value("2");
- task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
- task2.mutable_resources()->MergeFrom(resources);
-
- TaskGroupInfo taskGroup;
- taskGroup.add_tasks()->CopyFrom(task1);
- taskGroup.add_tasks()->CopyFrom(task2);
-
- // Return a pending future from authorizer.
- Future<Nothing> authorize1;
- Future<Nothing> authorize2;
- Promise<bool> promise1;
- Promise<bool> promise2;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize1),
- Return(promise1.future())))
- .WillOnce(DoAll(FutureSatisfy(&authorize2),
- Return(promise2.future())));
-
- Future<TaskStatus> task1Status;
- Future<TaskStatus> task2Status;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&task1Status))
- .WillOnce(FutureArg<1>(&task2Status));
-
- Offer::Operation operation;
- operation.set_type(Offer::Operation::LAUNCH_GROUP);
-
- Offer::Operation::LaunchGroup* launchGroup =
- operation.mutable_launch_group();
-
- launchGroup->mutable_executor()->CopyFrom(executor);
- launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
- driver.acceptOffers({offers.get()[0].id()}, {operation});
-
- // Wait until all authorizations are in progress.
- AWAIT_READY(authorize1);
- AWAIT_READY(authorize2);
-
- Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
- // Now kill task1.
- driver.killTask(task1.task_id());
-
- AWAIT_READY(task1Status);
- EXPECT_EQ(TASK_KILLED, task1Status->state());
- EXPECT_TRUE(strings::contains(
- task1Status->message(), "Killed before delivery to the agent"));
- EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
- task1Status->reason());
-
- // Now complete authorizations for task1 and task2.
- promise1.set(true);
- promise2.set(true);
-
- AWAIT_READY(task2Status);
- EXPECT_EQ(TASK_KILLED, task2Status->state());
- EXPECT_TRUE(strings::contains(
- task2Status->message(),
- "A task within the task group was killed before delivery to the agent"));
- EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
- task2Status->reason());
-
- // No task launch should happen resulting in all resources being
- // returned to the allocator.
- AWAIT_READY(recoverResources);
-
- // Make sure the task group is not known to master anymore.
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .Times(0);
-
- driver.reconcileTasks({});
-
- // We settle the clock here to ensure any updates sent by the master
- // are received. There shouldn't be any updates in this case.
- Clock::pause();
- Clock::settle();
-
- driver.stop();
- driver.join();
-}
-
-
-// This test verifies that a slave removal that comes before
-// '_accept()' is called results in TASK_LOST for a framework that is
-// not partition-aware.
-TEST_F(MasterAuthorizationTest, SlaveRemovedLost)
-{
- MockAuthorizer authorizer;
- Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer(&exec);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer>> offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->empty());
-
- TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
- // Return a pending future from authorizer.
- Future<Nothing> authorize;
- Promise<bool> promise;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize),
- Return(promise.future())));
-
- driver.launchTasks(offers.get()[0].id(), {task});
-
- // Wait until authorization is in progress.
- AWAIT_READY(authorize);
-
- Future<Nothing> slaveLost;
- EXPECT_CALL(sched, slaveLost(&driver, _))
- .WillOnce(FutureSatisfy(&slaveLost));
-
- Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
- // Stop the slave with explicit shutdown as otherwise with
- // checkpointing the master will wait for the slave to reconnect.
- slave.get()->shutdown();
- slave->reset();
-
- AWAIT_READY(slaveLost);
-
- Future<TaskStatus> status;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
-
- // Now complete authorization.
- promise.set(true);
-
- // Framework should get a TASK_LOST.
- AWAIT_READY(status);
-
- EXPECT_EQ(TASK_LOST, status->state());
- EXPECT_EQ(TaskStatus::SOURCE_MASTER, status->source());
- EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status->reason());
-
- // No task launch should happen resulting in all resources being
- // returned to the allocator.
- AWAIT_READY(recoverResources);
-
- // Check metrics.
- JSON::Object stats = Metrics();
- EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
- EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
- EXPECT_EQ(
- 1u, stats.values["master/task_lost/source_master/reason_slave_removed"]);
-
- // Make sure the task is not known to master anymore.
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .Times(0);
-
- driver.reconcileTasks({});
-
- // We settle the clock here to ensure any updates sent by the master
- // are received. There shouldn't be any updates in this case.
- Clock::pause();
- Clock::settle();
-
- driver.stop();
- driver.join();
-}
-
-
-// This test verifies that a slave removal that comes before
-// '_accept()' is called results in TASK_DROPPED for a framework that
-// is partition-aware.
-TEST_F(MasterAuthorizationTest, SlaveRemovedDropped)
-{
- MockAuthorizer authorizer;
- Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer(&exec);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
- ASSERT_SOME(slave);
-
- FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
- frameworkInfo.add_capabilities()->set_type(
- FrameworkInfo::Capability::PARTITION_AWARE);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer>> offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->empty());
-
- TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
- // Return a pending future from authorizer.
- Future<Nothing> authorize;
- Promise<bool> promise;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize),
- Return(promise.future())));
-
- driver.launchTasks(offers.get()[0].id(), {task});
-
- // Wait until authorization is in progress.
- AWAIT_READY(authorize);
-
- Future<Nothing> slaveLost;
- EXPECT_CALL(sched, slaveLost(&driver, _))
- .WillOnce(FutureSatisfy(&slaveLost));
-
- Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
- // Stop the slave with explicit shutdown as otherwise with
- // checkpointing the master will wait for the slave to reconnect.
- slave.get()->shutdown();
- slave->reset();
-
- AWAIT_READY(slaveLost);
-
- Future<TaskStatus> status;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
-
- // Now complete authorization.
- promise.set(true);
-
- // Framework should get a TASK_DROPPED.
- AWAIT_READY(status);
-
- EXPECT_EQ(TASK_DROPPED, status->state());
- EXPECT_EQ(TaskStatus::SOURCE_MASTER, status->source());
- EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status->reason());
-
- // No task launch should happen resulting in all resources being
- // returned to the allocator.
- AWAIT_READY(recoverResources);
-
- // Check metrics.
- JSON::Object stats = Metrics();
- EXPECT_EQ(0u, stats.values["master/tasks_lost"]);
- EXPECT_EQ(1u, stats.values["master/tasks_dropped"]);
- EXPECT_EQ(
- 1u,
- stats.values["master/task_dropped/source_master/reason_slave_removed"]);
-
- // Make sure the task is not known to master anymore.
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .Times(0);
-
- driver.reconcileTasks({});
-
- // We settle the clock here to ensure any updates sent by the master
- // are received. There shouldn't be any updates in this case.
- Clock::pause();
- Clock::settle();
-
- driver.stop();
- driver.join();
-}
-
-
-// This test verifies that a framework removal that comes before
-// '_accept()' is called results in recovery of resources.
-TEST_F(MasterAuthorizationTest, FrameworkRemoved)
-{
- MockAuthorizer authorizer;
- Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer(&exec);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer>> offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->empty());
-
- TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
- // Return a pending future from authorizer.
- Future<Nothing> authorize;
- Promise<bool> promise;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize),
- Return(promise.future())));
-
- driver.launchTasks(offers.get()[0].id(), {task});
-
- // Wait until authorization is in progress.
- AWAIT_READY(authorize);
-
- Future<Nothing> removeFramework =
- FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework);
-
- Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
-
- // Now stop the framework.
- driver.stop();
- driver.join();
-
- AWAIT_READY(removeFramework);
-
- // Now complete authorization.
- promise.set(true);
-
- // No task launch should happen resulting in all resources being
- // returned to the allocator.
- AWAIT_READY(recoverResources);
-}
-
-
-// This test verifies that two tasks each launched on a different
-// slave with same executor id but different executor info are
-// allowed even when the first task is pending due to authorization.
-TEST_F(MasterAuthorizationTest, PendingExecutorInfoDiffersOnDifferentSlaves)
-{
- MockAuthorizer authorizer;
- Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
- Future<Nothing> registered;
- EXPECT_CALL(sched, registered(&driver, _, _))
- .WillOnce(FutureSatisfy(®istered));
-
- driver.start();
-
- AWAIT_READY(registered);
-
- Future<vector<Offer>> offers1;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers1));
-
- // Start the first slave.
- MockExecutor exec1(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer1(&exec1);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave1 =
- StartSlave(detector.get(), &containerizer1);
- ASSERT_SOME(slave1);
-
- AWAIT_READY(offers1);
- ASSERT_FALSE(offers1->empty());
-
- // Launch the first task with the default executor id.
- ExecutorInfo executor1;
- executor1 = DEFAULT_EXECUTOR_INFO;
- executor1.mutable_command()->set_value("exit 1");
-
- TaskInfo task1 = createTask(
- offers1.get()[0], executor1.command().value(), executor1.executor_id());
-
- // Return a pending future from authorizer.
- // Note that we retire this expectation after its use because
- // the authorizer will next be called when `slave2` registers and
- // this expectation would be hit again (and be oversaturated) if
- // we don't retire. New expectations on `authorizer` will be set
- // after `slave2` is registered.
- Future<Nothing> authorize;
- Promise<bool> promise;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize),
- Return(promise.future())))
- .RetiresOnSaturation();
-
- driver.launchTasks(offers1.get()[0].id(), {task1});
-
- // Wait until authorization is in progress.
- AWAIT_READY(authorize);
-
- Future<vector<Offer>> offers2;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers2))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- // Now start the second slave.
- MockExecutor exec2(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer2(&exec2);
-
- Try<Owned<cluster::Slave>> slave2 =
- StartSlave(detector.get(), &containerizer2);
- ASSERT_SOME(slave2);
-
- AWAIT_READY(offers2);
- ASSERT_FALSE(offers2->empty());
-
- // Now launch the second task with the same executor id but
- // a different executor command.
- ExecutorInfo executor2;
- executor2 = executor1;
- executor2.mutable_command()->set_value("exit 2");
-
- TaskInfo task2 = createTask(
- offers2.get()[0], executor2.command().value(), executor2.executor_id());
-
- EXPECT_CALL(exec2, registered(_, _, _, _));
-
- EXPECT_CALL(exec2, launchTask(_, _))
- .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
- Future<TaskStatus> status2;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status2));
-
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(Return(true));
-
- driver.launchTasks(offers2.get()[0].id(), {task2});
-
- AWAIT_READY(status2);
- ASSERT_EQ(TASK_RUNNING, status2->state());
-
- EXPECT_CALL(exec1, registered(_, _, _, _));
-
- EXPECT_CALL(exec1, launchTask(_, _))
- .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
- Future<TaskStatus> status1;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status1));
-
- // Complete authorization of 'task1'.
- promise.set(true);
-
- AWAIT_READY(status1);
- ASSERT_EQ(TASK_RUNNING, status1->state());
-
- EXPECT_CALL(exec1, shutdown(_))
- .Times(AtMost(1));
-
- EXPECT_CALL(exec2, shutdown(_))
- .Times(AtMost(1));
-
- driver.stop();
- driver.join();
-}
-
-
// This test verifies that a framework registration with authorized
// role is successful.
TEST_F(MasterAuthorizationTest, AuthorizedRole)
@@ -3206,6 +2584,23 @@ public:
};
+class ControllableObjectApprover : public ObjectApprover
+{
+public:
+ ControllableObjectApprover(bool permissive_) : permissive(permissive_) {}
+ void disable() { permissive.store(false); }
+
+ Try<bool> approved(
+ const Option<ObjectApprover::Object>&) const noexcept override
+ {
+ return permissive.load();
+ }
+
+private:
+ std::atomic_bool permissive;
+};
+
+
INSTANTIATE_TEST_CASE_P(
AllowedAction,
MasterOperationAuthorizationTest,
@@ -3232,14 +2627,18 @@ TEST_P(MasterOperationAuthorizationTest, Accept)
{
Clock::pause();
- // We use this flag to control when the mock authorizer starts to deny
- // disallowed actions.
- std::atomic_bool permissive(true);
+ const auto controllableApprover =
+ std::make_shared<ControllableObjectApprover>(true);
MockAuthorizer authorizer;
- EXPECT_CALL(authorizer, authorized(_))
- .WillRepeatedly(Invoke([&](const authorization::Request& request) {
- return permissive || request.action() == GetParam();
+
+ const authorization::Action allowedAction = GetParam();
+ EXPECT_CALL(authorizer, getApprover(_, _))
+ .WillRepeatedly(Invoke([controllableApprover, allowedAction](
+ const Option<authorization::Subject>&,
+ const authorization::Action& action) {
+ return action == allowedAction ? getAcceptingObjectApprover()
+ : controllableApprover;
}));
Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
@@ -3493,7 +2892,7 @@ TEST_P(MasterOperationAuthorizationTest, Accept)
EXPECT_EQ(frameworkId, subscribed->framework_id());
// Start to deny disallowed actions.
- permissive = false;
+ controllableApprover->disable();
AWAIT_READY(offers);
ASSERT_EQ(1, offers->offers_size());
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 9688f5f..c47d4c3 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -10572,109 +10572,6 @@ TEST_P(MasterTestPrePostReservationRefinement,
}
-// This test verifies that hitting the `/state` endpoint before '_accept()'
-// is called results in pending tasks being reported correctly.
-TEST_P(MasterTestPrePostReservationRefinement, StateEndpointPendingTasks)
-{
- FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
- frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
-
- // TODO(mpark): Remove this once `RESERVATION_REFINEMENT`
- // is removed from `DEFAULT_FRAMEWORK_INFO`.
- frameworkInfo.clear_capabilities();
- frameworkInfo.add_capabilities()->set_type(
- FrameworkInfo::Capability::MULTI_ROLE);
-
- if (GetParam()) {
- frameworkInfo.add_capabilities()->set_type(
- FrameworkInfo::Capability::RESERVATION_REFINEMENT);
- }
-
- MockAuthorizer authorizer;
- Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer>> offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->empty());
-
- Offer offer = offers->front();
-
- TaskInfo task;
- task.set_name("");
- task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offer.slave_id());
- task.mutable_resources()->MergeFrom(offer.resources());
- task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
- // Return a pending future from authorizer.
- Future<Nothing> authorize;
- Promise<bool> promise;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize),
- Return(promise.future())));
-
- driver.launchTasks(offer.id(), {task});
-
- // Wait until authorization is in progress.
- AWAIT_READY(authorize);
-
- Future<Response> response = process::http::get(
- master.get()->pid,
- "state",
- None(),
- createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
- ASSERT_SOME(parse);
-
- JSON::Value result = parse.get();
-
- JSON::Object expected = {
- {
- "frameworks",
- JSON::Array {
- JSON::Object {
- {
- "tasks",
- JSON::Array {
- JSON::Object {
- { "id", "1" },
- { "role", frameworkInfo.roles(0) },
- { "state", "TASK_STAGING" }
- }
- }
- }
- }
- }
- }
- };
-
- EXPECT_TRUE(result.contains(expected));
-
- driver.stop();
- driver.join();
-}
-
-
// This test verifies that an operator can reserve and unreserve
// resources through the master operator API in both
// "(pre|post)-reservation-refinement" formats.
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index cdff370..4c84b1a 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -846,92 +846,6 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask)
}
-// This test ensures that reconciliation requests for tasks that are
-// pending are exposed in reconciliation.
-TEST_F(ReconciliationTest, PendingTask)
-{
- MockAuthorizer authorizer;
- Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer(&exec);
-
- Future<SlaveRegisteredMessage> slaveRegisteredMessage =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
- ASSERT_SOME(slave);
-
- // Wait for the slave to register and get the slave id.
- AWAIT_READY(slaveRegisteredMessage);
- const SlaveID slaveId = slaveRegisteredMessage->slave_id();
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer>> offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->empty());
-
- // Return a pending future from authorizer.
- Future<Nothing> authorize;
- Promise<bool> promise;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize),
- Return(promise.future())));
-
- TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
- driver.launchTasks(offers.get()[0].id(), {task});
-
- // Wait until authorization is in progress.
- AWAIT_READY(authorize);
-
- // First send an implicit reconciliation request for this task.
- Future<TaskStatus> update;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&update));
-
- driver.reconcileTasks({});
-
- AWAIT_READY(update);
- EXPECT_EQ(TASK_STAGING, update->state());
- EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update->reason());
- EXPECT_TRUE(update->has_slave_id());
-
- // Now send an explicit reconciliation request for this task.
- Future<TaskStatus> update2;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&update2));
-
- TaskStatus status;
- status.mutable_task_id()->CopyFrom(task.task_id());
- status.mutable_slave_id()->CopyFrom(slaveId);
- status.set_state(TASK_STAGING); // Dummy value.
-
- driver.reconcileTasks({status});
-
- AWAIT_READY(update2);
- EXPECT_EQ(TASK_STAGING, update2->state());
- EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update2->reason());
- EXPECT_TRUE(update2->has_slave_id());
-
- driver.stop();
- driver.join();
-}
-
-
// This test ensures that the master responds with the latest state
// for tasks that are terminal at the master, but have not been
// acknowledged by the framework. See MESOS-1389.