You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by as...@apache.org on 2020/03/03 07:52:50 UTC
[mesos] 03/08: Store per-framework ObjectApprovers.
This is an automated email from the ASF dual-hosted git repository.
asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit b920f60bcee913319fb78427f28f6f5789729478
Author: Andrei Sekretenko <as...@mesosphere.com>
AuthorDate: Tue Jan 21 21:58:23 2020 +0100
Store per-framework ObjectApprovers.
This is a prerequisite to synchronous authorization of scheduler API
calls (see MESOS-10056).
Review: https://reviews.apache.org/r/72096
---
src/master/framework.cpp | 65 ++++++++++++--
src/master/master.cpp | 141 ++++++++++++++++++++-----------
src/master/master.hpp | 41 +++++++--
src/tests/master_authorization_tests.cpp | 89 +++++++++++--------
src/tests/master_load_tests.cpp | 23 ++++-
5 files changed, 256 insertions(+), 103 deletions(-)
diff --git a/src/master/framework.cpp b/src/master/framework.cpp
index 85d9951..ffcf367 100644
--- a/src/master/framework.cpp
+++ b/src/master/framework.cpp
@@ -19,6 +19,13 @@
#include "common/heartbeater.hpp"
#include "common/protobuf_utils.hpp"
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::http::authentication::Principal;
+
+using mesos::authorization::ActionObject;
+
namespace mesos {
namespace internal {
namespace master {
@@ -28,8 +35,9 @@ Framework::Framework(
const Flags& masterFlags,
const FrameworkInfo& info,
const process::UPID& pid,
+ const Owned<ObjectApprovers>& approvers,
const process::Time& time)
- : Framework(master, masterFlags, info, CONNECTED, true, time)
+ : Framework(master, masterFlags, info, CONNECTED, true, approvers, time)
{
pid_ = pid;
}
@@ -40,8 +48,9 @@ Framework::Framework(
const Flags& masterFlags,
const FrameworkInfo& info,
const StreamingHttpConnection<v1::scheduler::Event>& http,
+ const Owned<ObjectApprovers>& approvers,
const process::Time& time)
- : Framework(master, masterFlags, info, CONNECTED, true, time)
+ : Framework(master, masterFlags, info, CONNECTED, true, approvers, time)
{
http_ = http;
}
@@ -51,7 +60,8 @@ Framework::Framework(
Master* const master,
const Flags& masterFlags,
const FrameworkInfo& info)
- : Framework(master, masterFlags, info, RECOVERED, false, process::Time())
+ : Framework(
+ master, masterFlags, info, RECOVERED, false, nullptr, process::Time())
{}
@@ -61,6 +71,7 @@ Framework::Framework(
const FrameworkInfo& _info,
State state,
bool active_,
+ const Owned<ObjectApprovers>& approvers,
const process::Time& time)
: master(_master),
info(_info),
@@ -72,7 +83,8 @@ Framework::Framework(
unreachableTasks(masterFlags.max_unreachable_tasks_per_framework),
metrics(_info, masterFlags.publish_per_framework_metrics),
active_(active_),
- state(state)
+ state(state),
+ objectApprovers(approvers)
{
CHECK(_info.has_id());
@@ -559,7 +571,9 @@ void Framework::update(const FrameworkInfo& newInfo)
}
-void Framework::updateConnection(const process::UPID& newPid)
+void Framework::updateConnection(
+ const process::UPID& newPid,
+ const Owned<ObjectApprovers>& objectApprovers_)
{
// Cleanup the old connection state if exists.
disconnect();
@@ -567,12 +581,14 @@ void Framework::updateConnection(const process::UPID& newPid)
// TODO(benh): unlink(oldPid);
pid_ = newPid;
+ objectApprovers = objectApprovers_;
setState(State::CONNECTED);
}
void Framework::updateConnection(
- const StreamingHttpConnection<v1::scheduler::Event>& newHttp)
+ const StreamingHttpConnection<v1::scheduler::Event>& newHttp,
+ const Owned<ObjectApprovers>& objectApprovers_)
{
// Note that master creates a new HTTP connection for every
// subscribe request, so 'newHttp' should always be different
@@ -587,6 +603,7 @@ void Framework::updateConnection(
CHECK_NONE(http_);
http_ = newHttp;
+ objectApprovers = objectApprovers_;
setState(State::CONNECTED);
}
@@ -620,6 +637,12 @@ bool Framework::disconnect()
http_ = None();
heartbeater.reset();
+
+ // `ObjectApprover`s are kept up-to-date by authorizer, which potentially
+ // entails continious interaction with an external IAM. Hence, we do not
+ // want to keep them alive if there is no subscribed scheduler.
+ objectApprovers.reset();
+
setState(State::DISCONNECTED);
return true;
}
@@ -703,6 +726,36 @@ void Framework::setState(Framework::State _state)
metrics.subscribed = state == Framework::State::CONNECTED ? 1 : 0;
}
+
+Try<bool> Framework::approved(const ActionObject& actionObject) const
+{
+ CHECK(objectApprovers.get() != nullptr)
+ << "Framework " << *this << " has no ObjectApprovers"
+ << " (attempt to call approved() for a disconnected framework?)";
+
+ return objectApprovers->approved(
+ actionObject.action(),
+ actionObject.object().getOrElse(authorization::Object()));
+}
+
+
+constexpr std::initializer_list<authorization::Action> SCHEDULER_API_ACTIONS{
+ authorization::REGISTER_FRAMEWORK};
+
+
+Future<Owned<ObjectApprovers>> Framework::createObjectApprovers(
+ const Option<Authorizer*>& authorizer,
+ const FrameworkInfo& frameworkInfo)
+{
+ return ObjectApprovers::create(
+ authorizer,
+ frameworkInfo.has_principal()
+ ? Option<Principal>(frameworkInfo.principal())
+ : Option<Principal>::none(),
+ SCHEDULER_API_ACTIONS);
+}
+
+
} // namespace master {
} // namespace internal {
} // namespace mesos {
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3c621e4..9d1e541 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2629,6 +2629,39 @@ Option<Error> Master::validateFramework(
}
+// Returns None if the framework object approvers are ready and the scheduler
+// trying to SUBSCRIBE is authorized to do so with provided framework info.
+// Otherwise, returns an error to be sent to the scheduler trying to subscribe.
+static Option<Error> checkSubscribeAuthorization(
+ const Future<Owned<ObjectApprovers>>& frameworkObjectApprovers,
+ const FrameworkInfo& frameworkInfo)
+{
+ if (frameworkObjectApprovers.isFailed()) {
+ return Error(
+ "Authorization failure: could not create ObjectApprovers for a "
+ "framework: " +
+ frameworkObjectApprovers.failure());
+ }
+
+ auto actionObject = ActionObject::frameworkRegistration(frameworkInfo);
+
+ CHECK(frameworkObjectApprovers.isReady());
+ Try<bool> approved = frameworkObjectApprovers.get()->approved(
+ actionObject.action(),
+ actionObject.object().getOrElse(authorization::Object()));
+
+ if (approved.isError()) {
+ return Error("Authorization failure: " + approved.error());
+ }
+
+ if (!*approved) {
+ return Error("Not authorized to " + stringify(actionObject));
+ }
+
+ return None();
+};
+
+
void Master::subscribe(
StreamingHttpConnection<v1::scheduler::Event> http,
scheduler::Call::Subscribe&& subscribe)
@@ -2669,11 +2702,12 @@ void Master::subscribe(
FrameworkInfo&&,
bool,
google::protobuf::RepeatedPtrField<string>&&,
- const Future<bool>&) = &Self::_subscribe;
+ const Future<Owned<ObjectApprovers>>&) = &Self::_subscribe;
- Future<bool> authorized = authorizeFramework(frameworkInfo);
+ Future<Owned<ObjectApprovers>> objectApprovers =
+ Framework::createObjectApprovers(authorizer, frameworkInfo);
- authorized.onAny(
+ objectApprovers.onAny(
defer(self(),
_subscribe,
http,
@@ -2689,21 +2723,12 @@ void Master::_subscribe(
FrameworkInfo&& frameworkInfo,
bool force,
google::protobuf::RepeatedPtrField<string>&& suppressedRolesField,
- const Future<bool>& authorized)
+ const Future<Owned<ObjectApprovers>>& objectApprovers)
{
- CHECK(!authorized.isDiscarded());
-
- Option<Error> authorizationError = None();
-
- if (authorized.isFailed()) {
- authorizationError =
- Error("Authorization failure: " + authorized.failure());
- } else if (!authorized.get()) {
- authorizationError = Error(
- "Not authorized to use roles '" +
- stringify(protobuf::framework::getRoles(frameworkInfo)) + "'");
- }
+ CHECK(!objectApprovers.isDiscarded());
+ Option<Error> authorizationError =
+ checkSubscribeAuthorization(objectApprovers, frameworkInfo);
if (authorizationError.isSome()) {
LOG(INFO) << "Refusing subscription of framework"
<< " '" << frameworkInfo.name() << "'"
@@ -2716,6 +2741,8 @@ void Master::_subscribe(
return;
}
+ CHECK(objectApprovers.isReady());
+
LOG(INFO) << "Subscribing framework '" << frameworkInfo.name()
<< "' with checkpointing "
<< (frameworkInfo.checkpoint() ? "enabled" : "disabled")
@@ -2731,7 +2758,8 @@ void Master::_subscribe(
FrameworkInfo frameworkInfo_ = frameworkInfo;
frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
- Framework* framework = new Framework(this, flags, frameworkInfo_, http);
+ Framework* framework =
+ new Framework(this, flags, frameworkInfo_, http, objectApprovers.get());
addFramework(framework, suppressedRoles);
@@ -2798,11 +2826,16 @@ void Master::_subscribe(
framework->reregisteredTime = Clock::now();
// Always failover the old framework connection. See MESOS-4712 for details.
- failoverFramework(framework, http);
+ failoverFramework(framework, http, objectApprovers.get());
} else {
// The framework has not yet reregistered after master failover.
connectAndActivateRecoveredFramework(
- framework, frameworkInfo, None(), http, suppressedRoles);
+ framework,
+ frameworkInfo,
+ None(),
+ http,
+ objectApprovers.get(),
+ suppressedRoles);
}
sendFrameworkUpdates(*framework);
@@ -2906,18 +2939,19 @@ void Master::subscribe(
FrameworkInfo&&,
bool,
google::protobuf::RepeatedPtrField<string>&&,
- const Future<bool>&) = &Self::_subscribe;
+ const Future<Owned<ObjectApprovers>>&) = &Self::_subscribe;
- Future<bool> authorized = authorizeFramework(frameworkInfo);
+ Future<Owned<ObjectApprovers>> objectApprovers =
+ Framework::createObjectApprovers(authorizer, frameworkInfo);
- authorized.onAny(
- defer(self(),
- _subscribe,
- from,
- std::move(frameworkInfo),
- subscribe.force(),
- std::move(*subscribe.mutable_suppressed_roles()),
- lambda::_1));
+ objectApprovers.onAny(defer(
+ self(),
+ _subscribe,
+ from,
+ std::move(frameworkInfo),
+ subscribe.force(),
+ std::move(*subscribe.mutable_suppressed_roles()),
+ lambda::_1));
}
@@ -2926,20 +2960,12 @@ void Master::_subscribe(
FrameworkInfo&& frameworkInfo,
bool force,
google::protobuf::RepeatedPtrField<string>&& suppressedRolesField,
- const Future<bool>& authorized)
+ const Future<Owned<ObjectApprovers>>& objectApprovers)
{
- CHECK(!authorized.isDiscarded());
+ CHECK(!objectApprovers.isDiscarded());
- Option<Error> authorizationError = None();
-
- if (authorized.isFailed()) {
- authorizationError =
- Error("Authorization failure: " + authorized.failure());
- } else if (!authorized.get()) {
- authorizationError = Error(
- "Not authorized to use roles '" +
- stringify(protobuf::framework::getRoles(frameworkInfo)) + "'");
- }
+ Option<Error> authorizationError =
+ checkSubscribeAuthorization(objectApprovers, frameworkInfo);
if (authorizationError.isSome()) {
LOG(INFO) << "Refusing subscription of framework"
@@ -2953,6 +2979,8 @@ void Master::_subscribe(
return;
}
+ CHECK(objectApprovers.isReady());
+
// At this point, authentications errors will be due to
// re-authentication during the authorization process,
// so we drop the subscription.
@@ -2997,7 +3025,8 @@ void Master::_subscribe(
// Assign a new FrameworkID.
frameworkInfo.mutable_id()->CopyFrom(newFrameworkId());
- Framework* framework = new Framework(this, flags, frameworkInfo, from);
+ Framework* framework =
+ new Framework(this, flags, frameworkInfo, from, objectApprovers.get());
addFramework(framework, suppressedRoles);
@@ -3099,7 +3128,7 @@ void Master::_subscribe(
// FrameworkReregisteredMessage back and activate the framework
// if necesssary.
LOG(INFO) << "Framework " << *framework << " failed over";
- failoverFramework(framework, from);
+ failoverFramework(framework, from, objectApprovers.get());
} else {
LOG(INFO) << "Allowing framework " << *framework
<< " to subscribe with an already used id";
@@ -3130,7 +3159,7 @@ void Master::_subscribe(
// framework link previously broke.
link(framework->pid().get());
- framework->updateConnection(*(framework->pid()));
+ framework->updateConnection(*(framework->pid()), objectApprovers.get());
if (framework->activate()) {
// The framework was not active and needs to be activated in allocator.
//
@@ -3147,7 +3176,12 @@ void Master::_subscribe(
} else {
// The framework has not yet reregistered after master failover.
connectAndActivateRecoveredFramework(
- framework, frameworkInfo, from, None(), suppressedRoles);
+ framework,
+ frameworkInfo,
+ from,
+ None(),
+ objectApprovers.get(),
+ suppressedRoles);
}
sendFrameworkUpdates(*framework);
@@ -10563,6 +10597,7 @@ void Master::connectAndActivateRecoveredFramework(
const FrameworkInfo& frameworkInfo,
const Option<UPID>& pid,
const Option<StreamingHttpConnection<v1::scheduler::Event>>& http,
+ const Owned<ObjectApprovers>& objectApprovers,
const set<string>& suppressedRoles)
{
// Exactly one of `pid` or `http` must be provided.
@@ -10587,10 +10622,10 @@ void Master::connectAndActivateRecoveredFramework(
// Update the framework's connection state.
if (pid.isSome()) {
- framework->updateConnection(pid.get());
+ framework->updateConnection(pid.get(), objectApprovers);
link(pid.get());
} else {
- framework->updateConnection(http.get());
+ framework->updateConnection(http.get(), objectApprovers);
http->closed()
.onAny(defer(self(), &Self::exited, framework->id(), http.get()));
}
@@ -10641,7 +10676,8 @@ void Master::connectAndActivateRecoveredFramework(
void Master::failoverFramework(
Framework* framework,
- const StreamingHttpConnection<v1::scheduler::Event>& http)
+ const StreamingHttpConnection<v1::scheduler::Event>& http,
+ const Owned<ObjectApprovers>& objectApprovers)
{
CHECK_NOTNULL(framework);
@@ -10665,7 +10701,7 @@ void Master::failoverFramework(
frameworks.principals.erase(framework->pid().get());
}
- framework->updateConnection(http);
+ framework->updateConnection(http, objectApprovers);
http.closed()
.onAny(defer(self(), &Self::exited, framework->id(), http));
@@ -10679,7 +10715,10 @@ void Master::failoverFramework(
// Replace the scheduler for a framework with a new process ID, in the
// event of a scheduler failover.
-void Master::failoverFramework(Framework* framework, const UPID& newPid)
+void Master::failoverFramework(
+ Framework* framework,
+ const UPID& newPid,
+ const Owned<ObjectApprovers>& objectApprovers)
{
CHECK_NOTNULL(framework);
@@ -10701,7 +10740,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
framework->send(message);
}
- framework->updateConnection(newPid);
+ framework->updateConnection(newPid, objectApprovers);
link(newPid);
_failoverFramework(framework);
diff --git a/src/master/master.hpp b/src/master/master.hpp
index f3239cd..e0ef1cd 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -640,17 +640,22 @@ protected:
const FrameworkInfo& frameworkInfo,
const Option<process::UPID>& pid,
const Option<StreamingHttpConnection<v1::scheduler::Event>>& http,
+ const process::Owned<ObjectApprovers>& objectApprovers,
const std::set<std::string>& suppressedRoles);
// Replace the scheduler for a framework with a new process ID, in
// the event of a scheduler failover.
- void failoverFramework(Framework* framework, const process::UPID& newPid);
+ void failoverFramework(
+ Framework* framework,
+ const process::UPID& newPid,
+ const process::Owned<ObjectApprovers>& objectApprovers);
// Replace the scheduler for a framework with a new HTTP connection,
// in the event of a scheduler failover.
void failoverFramework(
Framework* framework,
- const StreamingHttpConnection<v1::scheduler::Event>& http);
+ const StreamingHttpConnection<v1::scheduler::Event>& http,
+ const process::Owned<ObjectApprovers>& objectApprovers);
void _failoverFramework(Framework* framework);
@@ -919,7 +924,7 @@ private:
FrameworkInfo&& frameworkInfo,
bool force,
google::protobuf::RepeatedPtrField<std::string>&& suppressedRoles,
- const process::Future<bool>& authorized);
+ const process::Future<process::Owned<ObjectApprovers>>& objectApprovers);
void subscribe(
const process::UPID& from,
@@ -930,7 +935,7 @@ private:
FrameworkInfo&& frameworkInfo,
bool force,
google::protobuf::RepeatedPtrField<std::string>&& suppressedRoles,
- const process::Future<bool>& authorized);
+ const process::Future<process::Owned<ObjectApprovers>>& objectApprovers);
// Update framework via SchedulerDriver (i.e. no response
// code feedback, FrameworkErrorMessage on error).
@@ -2441,12 +2446,14 @@ struct Framework
const Flags& masterFlags,
const FrameworkInfo& info,
const process::UPID& _pid,
+ const process::Owned<ObjectApprovers>& objectApprovers,
const process::Time& time = process::Clock::now());
Framework(Master* const master,
const Flags& masterFlags,
const FrameworkInfo& info,
const StreamingHttpConnection<v1::scheduler::Event>& _http,
+ const process::Owned<ObjectApprovers>& objectApprovers,
const process::Time& time = process::Clock::now());
Framework(Master* const master,
@@ -2515,14 +2522,18 @@ struct Framework
// Reactivate framework with new connection: update connection-related state
// and mark the framework as CONNECTED, regardless of the previous state.
- void updateConnection(const process::UPID& newPid);
void updateConnection(
- const StreamingHttpConnection<v1::scheduler::Event>& newHttp);
+ const process::UPID& newPid,
+ const process::Owned<ObjectApprovers>& objectApprovers);
+
+ void updateConnection(
+ const StreamingHttpConnection<v1::scheduler::Event>& newHttp,
+ const process::Owned<ObjectApprovers>& objectApprovers);
// If the framework is CONNECTED, clear all state associated with
// the scheduler being connected (close http connection, stop heartbeater,
- // etc.), mark the framework DISCONNECTED and return `true`.
- // Otherwise, return `false`.
+ // clear object approvers, etc.), mark the framework DISCONNECTED and return
+ // `true`. Otherwise, return `false`.
bool disconnect();
// Mark the framework as active (eligible to receive offers if connected)
@@ -2548,6 +2559,16 @@ struct Framework
const Option<process::UPID>& pid() const { return pid_; }
+ // Returns ObjectApprovers for all actions
+ // needed to authorize scheduler API calls.
+ static process::Future<process::Owned<ObjectApprovers>> createObjectApprovers(
+ const Option<Authorizer*>& _authorizer,
+ const FrameworkInfo& frameworkInfo);
+
+ // Returns whether the framework principal is authorized to perform
+ // action on object.
+ Try<bool> approved(const authorization::ActionObject& actionObject) const;
+
Master* const master;
FrameworkInfo info;
@@ -2640,6 +2661,7 @@ private:
const FrameworkInfo& _info,
State state,
bool active,
+ const process::Owned<ObjectApprovers>& objectApprovers,
const process::Time& time);
Framework(const Framework&); // No copying.
@@ -2667,6 +2689,9 @@ private:
// This is only set for HTTP frameworks.
process::Owned<ResponseHeartbeater<scheduler::Event, v1::scheduler::Event>>
heartbeater;
+
+ // ObjectApprovers for the framework's principal.
+ process::Owned<ObjectApprovers> objectApprovers;
};
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index bc8155b..4074a18 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -15,6 +15,7 @@
// limitations under the License.
#include <atomic>
+#include <memory>
#include <set>
#include <string>
#include <utility>
@@ -68,6 +69,8 @@
namespace http = process::http;
+using std::shared_ptr;
+
using google::protobuf::RepeatedPtrField;
using mesos::internal::master::Master;
@@ -102,6 +105,7 @@ using testing::An;
using testing::AtMost;
using testing::DoAll;
using testing::Eq;
+using testing::Ne;
using testing::Invoke;
using testing::Return;
using testing::Truly;
@@ -1084,6 +1088,10 @@ TEST_F(MasterAuthorizationTest, UnauthorizedRole)
driver.join();
}
+static shared_ptr<const ObjectApprover> getAcceptingObjectApprover()
+{
+ return std::make_shared<AcceptingObjectApprover>();
+}
// This test verifies that an authentication request that comes from
// the same instance of the framework (e.g., ZK blip) before
@@ -1108,15 +1116,18 @@ TEST_F(MasterAuthorizationTest, DuplicateRegistration)
// Return pending futures from authorizer.
Future<Nothing> authorize1;
- Promise<bool> promise1;
+ Promise<shared_ptr<const ObjectApprover>> promise1;
Future<Nothing> authorize2;
- Promise<bool> promise2;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize1),
- Return(promise1.future())))
- .WillOnce(DoAll(FutureSatisfy(&authorize2),
- Return(promise2.future())))
- .WillRepeatedly(Return(true)); // Authorize subsequent registration retries.
+ Promise<shared_ptr<const ObjectApprover>> promise2;
+
+ // Expect requests for two approvers for REGISTER_FRAMEWORK.
+ EXPECT_CALL(authorizer, getApprover(_, authorization::REGISTER_FRAMEWORK))
+ .WillOnce(DoAll(FutureSatisfy(&authorize1), Return(promise1.future())))
+ .WillOnce(DoAll(FutureSatisfy(&authorize2), Return(promise2.future())));
+
+ // Handle requests for all other approvers.
+ EXPECT_CALL(authorizer, getApprover(_, Ne(authorization::REGISTER_FRAMEWORK)))
+ .WillRepeatedly(Return(getAcceptingObjectApprover()));
// Pause the clock to avoid registration retries.
Clock::pause();
@@ -1133,7 +1144,7 @@ TEST_F(MasterAuthorizationTest, DuplicateRegistration)
AWAIT_READY(authorize2);
// Now complete the first authorization attempt.
- promise1.set(true);
+ promise1.set(getAcceptingObjectApprover());
// First registration request should succeed because the
// framework PID did not change.
@@ -1143,7 +1154,7 @@ TEST_F(MasterAuthorizationTest, DuplicateRegistration)
FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _);
// Now complete the second authorization attempt.
- promise2.set(true);
+ promise2.set(getAcceptingObjectApprover());
// Master should acknowledge the second registration attempt too.
AWAIT_READY(frameworkRegisteredMessage);
@@ -1176,16 +1187,16 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration)
// Return pending futures from authorizer after the first attempt.
Future<Nothing> authorize2;
- Promise<bool> promise2;
+ Promise<shared_ptr<const ObjectApprover>> promise2;
Future<Nothing> authorize3;
- Promise<bool> promise3;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(Return(true))
- .WillOnce(DoAll(FutureSatisfy(&authorize2),
- Return(promise2.future())))
- .WillOnce(DoAll(FutureSatisfy(&authorize3),
- Return(promise3.future())))
- .WillRepeatedly(Return(true)); // Authorize subsequent registration retries.
+ Promise<shared_ptr<const ObjectApprover>> promise3;
+ EXPECT_CALL(authorizer, getApprover(_, authorization::REGISTER_FRAMEWORK))
+ .WillOnce(Return(getAcceptingObjectApprover()))
+ .WillOnce(DoAll(FutureSatisfy(&authorize2), Return(promise2.future())))
+ .WillOnce(DoAll(FutureSatisfy(&authorize3), Return(promise3.future())));
+
+ EXPECT_CALL(authorizer, getApprover(_, Ne(authorization::REGISTER_FRAMEWORK)))
+ .WillRepeatedly(Return(getAcceptingObjectApprover()));
// Pause the clock to avoid re-registration retries.
Clock::pause();
@@ -1214,7 +1225,7 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration)
.WillOnce(FutureSatisfy(&reregistered));
// Now complete the second authorization attempt.
- promise2.set(true);
+ promise2.set(getAcceptingObjectApprover());
// First re-registration request should succeed because the
// framework PID did not change.
@@ -1224,7 +1235,7 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration)
FUTURE_PROTOBUF(FrameworkReregisteredMessage(), _, _);
// Now complete the third authorization attempt.
- promise3.set(true);
+ promise3.set(getAcceptingObjectApprover());
// Master should acknowledge the second re-registration attempt too.
AWAIT_READY(frameworkReregisteredMessage);
@@ -1235,7 +1246,7 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration)
// This test ensures that a framework that is removed while
-// authorization for registration is in progress is properly handled.
+// obtaining ObjectApprovers during registration is properly handled.
TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration)
{
MockAuthorizer authorizer;
@@ -1248,11 +1259,13 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration)
// Return a pending future from authorizer.
Future<Nothing> authorize;
- Promise<bool> promise;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(DoAll(FutureSatisfy(&authorize),
- Return(promise.future())))
- .WillRepeatedly(Return(true)); // Authorize subsequent registration retries.
+ Promise<shared_ptr<const ObjectApprover>> promise;
+ EXPECT_CALL(authorizer, getApprover(_, authorization::REGISTER_FRAMEWORK))
+ .WillRepeatedly(DoAll(FutureSatisfy(&authorize), Return(promise.future())));
+
+ EXPECT_CALL(authorizer, getApprover(_, Ne(authorization::REGISTER_FRAMEWORK)))
+ .WillRepeatedly(Return(getAcceptingObjectApprover()));
+
// Pause the clock to avoid scheduler registration retries.
Clock::pause();
@@ -1276,8 +1289,8 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration)
Future<Nothing> removeFramework =
FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework);
- // Now complete authorization.
- promise.set(true);
+ // Now make all the returned approvers ready.
+ promise.set(getAcceptingObjectApprover());
// When the master tries to link to a non-existent framework PID
// it should realize the framework is gone and remove it.
@@ -1305,13 +1318,17 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeReregistration)
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureSatisfy(®istered));
- // Return a pending future from authorizer after first attempt.
+ // Return a pending future from authorizer after first request for
+ // REGISTER_FRAMEWORK approver
Future<Nothing> authorize2;
- Promise<bool> promise2;
- EXPECT_CALL(authorizer, authorized(_))
- .WillOnce(Return(true))
- .WillOnce(DoAll(FutureSatisfy(&authorize2),
- Return(promise2.future())));
+ Promise<shared_ptr<const ObjectApprover>> promise2;
+ EXPECT_CALL(authorizer, getApprover(_, authorization::REGISTER_FRAMEWORK))
+ .WillOnce(Return(getAcceptingObjectApprover()))
+ .WillOnce(DoAll(FutureSatisfy(&authorize2), Return(promise2.future())));
+
+ // Handle all other actions.
+ EXPECT_CALL(authorizer, getApprover(_, Ne(authorization::REGISTER_FRAMEWORK)))
+ .WillRepeatedly(Return(getAcceptingObjectApprover()));
// Pause the clock to avoid scheduler registration retries.
Clock::pause();
@@ -1344,7 +1361,7 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeReregistration)
AWAIT_READY(removeFramework);
// Now complete the second authorization attempt.
- promise2.set(true);
+ promise2.set(getAcceptingObjectApprover());
// Master should drop the second framework re-registration request
// because the framework PID was removed from 'authenticated' map.
diff --git a/src/tests/master_load_tests.cpp b/src/tests/master_load_tests.cpp
index c7c1b12..f6656ff 100644
--- a/src/tests/master_load_tests.cpp
+++ b/src/tests/master_load_tests.cpp
@@ -122,7 +122,7 @@ public:
BlockingAuthorizerProcess(Authorizer* underlying)
: ProcessBase(process::ID::generate("blocking-authorizer")),
underlying_(underlying),
- blocked_(true) {}
+ blocked_(false) {}
Future<bool> authorized(const authorization::Request& request)
{
@@ -151,7 +151,14 @@ public:
return promises_.size();
}
- // Satisfies all future and prior calls made to `getApprover`.
+ Future<Nothing> block()
+ {
+ blocked_ = true;
+
+ return Nothing();
+ }
+
+ // Satisfies all future and prending calls made to `getApprover`.
Future<Nothing> unleash()
{
CHECK_EQ(promises_.size(), futures_.size());
@@ -217,6 +224,13 @@ public:
&BlockingAuthorizerProcess::pending);
}
+ Future<Nothing> block()
+ {
+ return process::dispatch(
+ process_.get(),
+ &BlockingAuthorizerProcess::block);
+ }
+
Future<Nothing> unleash()
{
return process::dispatch(
@@ -266,6 +280,11 @@ void MasterLoadTest::prepareCluster(Authorizer* authorizer)
slave_ = slave.get();
AWAIT_READY(slaveRegisteredMessage);
+
+ // NOTE: Authorizer is blocked after preparing the cluster, otherwise
+ // framework registration, which also uses `prepareObjectApprover(...) will
+ // be blocked too.
+ authorizer_->block();
}