You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/06/11 01:33:46 UTC
[1/7] git commit: Authorized resource roles to receive offers from.
Repository: mesos
Updated Branches:
refs/heads/vinod/authorize_tasks [created] 57f352e20
Authorized resource roles to receive offers from.
Review: https://reviews.apache.org/r/22284
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/57f352e2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/57f352e2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/57f352e2
Branch: refs/heads/vinod/authorize_tasks
Commit: 57f352e20a85e077321bdc9973f33d0464917146
Parents: c408455
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Jun 4 16:08:30 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700
----------------------------------------------------------------------
src/master/flags.hpp | 3 +-
src/master/master.cpp | 154 ++++++++++--
src/master/master.hpp | 25 +-
src/tests/master_authorization_tests.cpp | 347 ++++++++++++++++++++++++++
4 files changed, 495 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/57f352e2/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 4863359..ed08c6b 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -175,9 +175,10 @@ public:
"Human readable name for the cluster,\n"
"displayed in the webui.");
+ // TODO(vinod): Deprecate this in favor of '--acls'.
add(&Flags::roles,
"roles",
- "A comma seperated list of the allocation\n"
+ "A comma separated list of the allocation\n"
"roles that frameworks in this cluster may\n"
"belong to.");
http://git-wip-us.apache.org/repos/asf/mesos/blob/57f352e2/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1a2b219..4a01b1a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -990,7 +990,18 @@ void Master::detected(const Future<Option<MasterInfo> >& _leader)
}
-Try<Nothing> Master::validate(
+// Helper to convert authorization result to Future<Option<Error> >.
+static Future<Option<Error> > _authorize(const string& message, bool authorized)
+{
+ if (authorized) {
+ return None();
+ }
+
+ return Error(message);
+}
+
+
+Future<Option<Error> > Master::validate(
const FrameworkInfo& frameworkInfo,
const UPID& from)
{
@@ -1016,11 +1027,33 @@ Try<Nothing> Master::validate(
}
}
+ // TODO(vinod): Deprecate this in favor of ACLs.
if (!roles.contains(frameworkInfo.role())) {
- return Error("Role '" + frameworkInfo.role() + "' is not valid.");
+ return Error("Role '" + frameworkInfo.role() + "' is invalid");
}
- return Nothing();
+ if (authorizer.isNone()) {
+ // Authorization is disabled.
+ return None();
+ }
+
+ LOG(INFO)
+ << "Authorizing framework principal '" << frameworkInfo.principal()
+ << "' to receive offers for role '" << frameworkInfo.role() << "'";
+
+ mesos::ACL::ReceiveOffers request;
+ if (frameworkInfo.has_principal()) {
+ request.mutable_principals()->add_values(frameworkInfo.principal());
+ } else {
+ // Framework doesn't have a principal set.
+ request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+ }
+ request.mutable_roles()->add_values(frameworkInfo.role());
+
+ return authorizer.get()->authorize(request).then(
+ lambda::bind(&_authorize,
+ "Not authorized to use role '" + frameworkInfo.role() + "'",
+ lambda::_1));
}
@@ -1031,6 +1064,11 @@ void Master::registerFramework(
++metrics.messages_register_framework;
if (authenticating.contains(from)) {
+ // TODO(vinod): Consider dropping this request and fix the tests
+ // to deal with the drop. Currently there is a race between master
+ // realizing the framework is authenticated and framework sending
+ // a registration request. Dropping this message will cause the
+ // framework to retry slowing down the tests.
LOG(INFO) << "Queuing up registration request from " << from
<< " because authentication is still in progress";
@@ -1039,17 +1077,48 @@ void Master::registerFramework(
return;
}
- Try<Nothing> valid = validate(frameworkInfo, from);
- if (valid.isError()) {
- LOG(WARNING) << "Refusing registration of framework at " << from << ": "
- << valid.error();
+ LOG(INFO) << "Received registration request from " << from;
+
+ validate(frameworkInfo, from)
+ .onAny(defer(self(),
+ &Master::_registerFramework,
+ from,
+ frameworkInfo,
+ lambda::_1));
+}
+
+
+void Master::_registerFramework(
+ const UPID& from,
+ const FrameworkInfo& frameworkInfo,
+ const Future<Option<Error> >& validationError)
+{
+ CHECK_READY(validationError);
+ if (validationError.get().isSome()) {
+ LOG(INFO) << "Refusing registration of framework at " << from << ": "
+ << validationError.get().get().message;
+
FrameworkErrorMessage message;
- message.set_message(valid.error());
+ message.set_message(validationError.get().get().message);
send(from, message);
return;
}
- LOG(INFO) << "Received registration request from " << from;
+ if (authenticating.contains(from)) {
+ // This could happen if a new authentication request came from the
+ // same framework while validation was in progress.
+ LOG(INFO) << "Dropping registration request from " << from
+ << " because new authentication attempt is in progress";
+ return;
+ }
+
+ if (flags.authenticate_frameworks && !authenticated.contains(from)) {
+ // This could happen if another (failed over) framework
+ // authenticated while validation was in progress.
+ LOG(INFO) << "Dropping registration request from " << from
+ << " because it is not authenticated";
+ return;
+ }
// Check if this framework is already registered (because it retries).
foreachvalue (Framework* framework, frameworks.activated) {
@@ -1069,6 +1138,7 @@ void Master::registerFramework(
LOG(INFO) << "Registering framework " << framework->id << " at " << from;
+ // TODO(vinod): Deprecate this in favor of authorization.
bool rootSubmissions = flags.root_submissions;
if (framework->info.user() == "root" && rootSubmissions == false) {
@@ -1095,7 +1165,9 @@ void Master::reregisterFramework(
if (authenticating.contains(from)) {
LOG(INFO) << "Queuing up re-registration request from " << from
<< " because authentication is still in progress";
-
+ // TODO(vinod): Consider dropping this request and fix the tests
+ // to deal with the drop. See 'Master::registerFramework()' for
+ // more details.
authenticating[from]
.onReady(defer(self(),
&Self::reregisterFramework,
@@ -1113,16 +1185,57 @@ void Master::reregisterFramework(
return;
}
- Try<Nothing> valid = validate(frameworkInfo, from);
- if (valid.isError()) {
- LOG(WARNING) << "Refusing re-registration of framework at " << from << ": "
- << valid.error();
+ LOG(INFO) << "Received re-registration request from framework "
+ << frameworkInfo.id() << " at " << from;
+
+ validate(frameworkInfo, from)
+ .onAny(defer(self(),
+ &Master::_reregisterFramework,
+ from,
+ frameworkInfo,
+ failover,
+ lambda::_1));
+}
+
+
+void Master::_reregisterFramework(
+ const UPID& from,
+ const FrameworkInfo& frameworkInfo,
+ bool failover,
+ const Future<Option<Error> >& validationError)
+{
+ CHECK_READY(validationError);
+ if (validationError.get().isSome()) {
+ LOG(INFO) << "Refusing re-registration of framework " << frameworkInfo.id()
+ << " at " << from << ": " << validationError.get().get().message;
+
FrameworkErrorMessage message;
- message.set_message(valid.error());
+ message.set_message(validationError.get().get().message);
send(from, message);
return;
}
+ if (authenticating.contains(from)) {
+ // This could happen if a new authentication request came from the
+ // same framework while validation was in progress.
+ LOG(INFO) << "Dropping re-registration request of framework "
+ << frameworkInfo.id() << " at " << from
+ << " because new authentication attempt is in progress";
+ return;
+ }
+
+ if (flags.authenticate_frameworks && !authenticated.contains(from)) {
+ // This could happen if another (failed over) framework
+ // authenticated while validation was in progress. It is important
+ // to drop this because if this request is from a failing over
+ // framework (pid = from) we don't want to failover the already
+ // registered framework (pid = framework->pid).
+ LOG(INFO) << "Dropping re-registration request of framework "
+ << frameworkInfo.id() << " at " << from
+ << " because it is not authenticated";
+ return;
+ }
+
LOG(INFO) << "Re-registering framework " << frameworkInfo.id()
<< " at " << from;
@@ -1879,17 +1992,6 @@ void Master::launchTasks(
}
-// Helper to convert authorization result to Future<Option<Error> >.
-static Future<Option<Error> > _authorize(const string& message, bool authorized)
-{
- if (authorized) {
- return None();
- }
-
- return Error(message);
-}
-
-
Future<Option<Error> > Master::validateTask(
const TaskInfo& task,
Framework* framework,
http://git-wip-us.apache.org/repos/asf/mesos/blob/57f352e2/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0c68a5b..7a12185 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -275,6 +275,19 @@ protected:
const std::vector<ExecutorInfo>& executors,
const std::vector<Task>& tasks);
+ // 'registerFramework()' continuation.
+ void _registerFramework(
+ const process::UPID& from,
+ const FrameworkInfo& frameworkInfo,
+ const process::Future<Option<Error> >& validationError);
+
+ // 'reregisterFramework()' continuation.
+ void _reregisterFramework(
+ const process::UPID& from,
+ const FrameworkInfo& frameworkInfo,
+ bool failover,
+ const process::Future<Option<Error> >& validationError);
+
// Add a framework.
void addFramework(Framework* framework);
@@ -635,13 +648,11 @@ private:
Option<process::Time> electedTime; // Time when this master is elected.
- // Helper method for common FrameworkInfo and PID validation shared
- // by Framework registration and re-registration.
- // An error return value indicates the request is invalid and a
- // FrameworkErrorMessage should be returned.
- // Note that registration/re-registration specific checking is not
- // handled here.
- Try<Nothing> validate(
+ // Validates the framework including authorization.
+ // Returns None if the framework is valid.
+ // Returns Error if the framework is invalid.
+ // Returns Failure if authorization returns 'Failure'.
+ process::Future<Option<Error> > validate(
const FrameworkInfo& frameworkInfo,
const process::UPID& from);
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/57f352e2/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index 3a28ca2..661d67e 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -599,3 +599,350 @@ TEST_F(MasterAuthorizationTest, ReconcileTask)
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
+
+
+// This test verifies that a framework registration with authorized
+// role is successful.
+TEST_F(MasterAuthorizationTest, AuthorizedRole)
+{
+ // Setup ACLs so that the framework can receive offers for role
+ // "foo".
+ ACLs acls;
+ mesos::ACL::ReceiveOffers* acl = acls.add_receive_offers();
+ acl->mutable_principals()->add_values(DEFAULT_FRAMEWORK_INFO.principal());
+ acl->mutable_roles()->add_values("foo");
+
+ master::Flags flags = CreateMasterFlags();
+ flags.roles = "foo";
+ flags.acls = JSON::Protobuf(acls);
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("foo");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ driver.start();
+
+ AWAIT_READY(registered);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that a framework registration with unauthorized
+// role is denied.
+TEST_F(MasterAuthorizationTest, UnauthorizedRole)
+{
+ // Setup ACLs so that no framework can receive offers for role
+ // "foo".
+ ACLs acls;
+ mesos::ACL::ReceiveOffers* acl = acls.add_receive_offers();
+ acl->mutable_principals()->set_type(mesos::ACL::Entity::NONE);
+ acl->mutable_roles()->add_values("foo");
+
+ master::Flags flags = CreateMasterFlags();
+ flags.roles = "foo";
+ flags.acls = JSON::Protobuf(acls);
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("foo");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<Nothing> error;
+ EXPECT_CALL(sched, error(&driver, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ driver.start();
+
+ // Framework should get error message from the master.
+ AWAIT_READY(error);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that an authentication request that comes from
+// the same instance of the framework (e.g., ZK blip) before
+// 'Master::_registerFramework()' from an earlier attempt, causes the
+// master to successfully register the framework.
+TEST_F(MasterAuthorizationTest, DuplicateRegistration)
+{
+ MockAuthorizer authorizer;
+ Try<PID<Master> > master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ // Create a detector for the scheduler driver because we want the
+ // spurious leading master change to be known by the scheduler
+ // driver only.
+ StandaloneMasterDetector detector(master.get());
+ MockScheduler sched;
+ TestingMesosSchedulerDriver driver(&sched, &detector);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ // Return pending futures from authorizer.
+ Future<Nothing> future1;
+ Promise<bool> promise1;
+ Future<Nothing> future2;
+ Promise<bool> promise2;
+ EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+ .WillOnce(DoAll(FutureSatisfy(&future1),
+ Return(promise1.future())))
+ .WillOnce(DoAll(FutureSatisfy(&future2),
+ Return(promise2.future())));
+
+ driver.start();
+
+ // Wait until first authorization attempt is in progress.
+ AWAIT_READY(future1);
+
+ // Simulate a spurious leading master change at the scheduler.
+ detector.appoint(master.get());
+
+ // Wait until second authorization attempt is in progress.
+ AWAIT_READY(future2);
+
+ // Now complete the first authorization attempt.
+ promise1.set(true);
+
+ // First registration request should succeed because the
+ // framework PID did not change.
+ AWAIT_READY(registered);
+
+ Future<FrameworkRegisteredMessage> frameworkRegisteredMessage =
+ FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _);
+
+ // Now complete the second authorization attempt.
+ promise2.set(true);
+
+ // Master should acknowledge the second registration attempt too.
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that an authentication request that comes from
+// the same instance of the framework (e.g., ZK blip) before
+// 'Master::_reregisterFramework()' from an earlier attempt, causes
+// the master to successfully re-register the framework.
+TEST_F(MasterAuthorizationTest, DuplicateReregistration)
+{
+ MockAuthorizer authorizer;
+ Try<PID<Master> > master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ // Create a detector for the scheduler driver because we want the
+ // spurious leading master change to be known by the scheduler
+ // driver only.
+ StandaloneMasterDetector detector(master.get());
+ MockScheduler sched;
+ TestingMesosSchedulerDriver driver(&sched, &detector);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ // Return pending futures from authorizer after the first attempt.
+ Future<Nothing> future2;
+ Promise<bool> promise2;
+ Future<Nothing> future3;
+ Promise<bool> promise3;
+ EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+ .WillOnce(Return(true))
+ .WillOnce(DoAll(FutureSatisfy(&future2),
+ Return(promise2.future())))
+ .WillOnce(DoAll(FutureSatisfy(&future3),
+ Return(promise3.future())));
+
+ driver.start();
+
+ // Wait for the framework to be registered.
+ AWAIT_READY(registered);
+
+ EXPECT_CALL(sched, disconnected(&driver));
+
+ // Simulate a spurious leading master change at the scheduler.
+ detector.appoint(master.get());
+
+ // Wait until the second authorization attempt is in progress.
+ AWAIT_READY(future2);
+
+ // Simulate another spurious leading master change at the scheduler.
+ detector.appoint(master.get());
+
+ // Wait until the third authorization attempt is in progress.
+ AWAIT_READY(future3);
+
+ Future<Nothing> reregistered;
+ EXPECT_CALL(sched, reregistered(&driver, _))
+ .WillOnce(FutureSatisfy(&reregistered));
+
+ // Now complete the second authorization attempt.
+ promise2.set(true);
+
+ // First re-registration request should succeed because the
+ // framework PID did not change.
+ AWAIT_READY(reregistered);
+
+ Future<FrameworkReregisteredMessage> frameworkReregisteredMessage =
+ FUTURE_PROTOBUF(FrameworkReregisteredMessage(), _, _);
+
+ // Now complete the third authorization attempt.
+ promise3.set(true);
+
+ // Master should acknowledge the second re-registration attempt too.
+ AWAIT_READY(frameworkReregisteredMessage);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test ensures that a framework that is removed while
+// authorization for registration is in progress is properly handled.
+TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration)
+{
+ MockAuthorizer authorizer;
+ Try<PID<Master> > master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ // Return a pending future from authorizer.
+ Future<Nothing> future;
+ Promise<bool> promise;
+ EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+ .WillOnce(DoAll(FutureSatisfy(&future),
+ Return(promise.future())));
+
+ driver.start();
+
+ // Wait until authorization is in progress.
+ AWAIT_READY(future);
+
+ // Stop the framework.
+ // At this point the framework is disconnected but the master does
+ // not take any action because the framework is not in its map yet.
+ driver.stop();
+ driver.join();
+
+ // Settle the clock here to ensure master handles the framework
+ // 'exited' event.
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ Future<Nothing> frameworkRemoved =
+ FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+ // Now complete authorization.
+ promise.set(true);
+
+ // When the master tries to link to a non-existent framework PID
+ // it should realize the framework is gone and remove it.
+ AWAIT_READY(frameworkRemoved);
+
+ Shutdown();
+}
+
+
+// This test ensures that a framework that is removed while
+// authorization for re-registration is in progress is properly
+// handled.
+TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeReregistration)
+{
+ MockAuthorizer authorizer;
+ Try<PID<Master> > master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ // Create a detector for the scheduler driver because we want the
+ // spurious leading master change to be known by the scheduler
+ // driver only.
+ StandaloneMasterDetector detector(master.get());
+ MockScheduler sched;
+ TestingMesosSchedulerDriver driver(&sched, &detector);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ // Return a pending future from authorizer after first attempt.
+ Future<Nothing> future2;
+ Promise<bool> promise2;
+ EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+ .WillOnce(Return(true))
+ .WillOnce(DoAll(FutureSatisfy(&future2),
+ Return(promise2.future())));
+
+ driver.start();
+
+ // Wait until the framework is registered.
+ AWAIT_READY(registered);
+
+ EXPECT_CALL(sched, disconnected(&driver));
+
+ // Framework should not be re-registered.
+ EXPECT_CALL(sched, reregistered(&driver, _))
+ .Times(0);
+
+ // Simulate a spurious leading master change at the scheduler.
+ detector.appoint(master.get());
+
+ // Wait until the second authorization attempt is in progress.
+ AWAIT_READY(future2);
+
+ Future<Nothing> frameworkRemoved =
+ FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+ // Stop the framework.
+ driver.stop();
+ driver.join();
+
+ // Wait until the framework is removed.
+ AWAIT_READY(frameworkRemoved);
+
+ // Now complete the second authorization attempt.
+ promise2.set(true);
+
+ // Master should drop the second framework re-registration request
+ // because the framework PID was removed from 'authenticated' map.
+ // Settle the clock here to ensure 'Master::_reregisterFramework()'
+ // is executed.
+ Clock::pause();
+ Clock::settle();
+
+ Shutdown();
+}
[5/7] git commit: Injected Authorizer into Master.
Posted by vi...@apache.org.
Injected Authorizer into Master.
Review: https://reviews.apache.org/r/22150
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b5085168
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b5085168
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b5085168
Branch: refs/heads/vinod/authorize_tasks
Commit: b5085168f2ada382df2eaf600a5349e21e2b38d3
Parents: 6267a0f
Author: Vinod Kone <vi...@twitter.com>
Authored: Sat May 31 19:04:02 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700
----------------------------------------------------------------------
src/local/local.cpp | 29 +++++++++++++++++++++++++++--
src/master/main.cpp | 21 +++++++++++++++++++++
src/master/master.cpp | 14 ++++++--------
src/master/master.hpp | 5 +++--
src/tests/cluster.hpp | 22 +++++++++++++++++++++-
5 files changed, 78 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 5d26aff..e05a225 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -21,13 +21,17 @@
#include <sstream>
#include <vector>
+#include <process/owned.hpp>
#include <process/pid.hpp>
#include <stout/exit.hpp>
#include <stout/foreach.hpp>
#include <stout/path.hpp>
+#include <stout/try.hpp>
#include <stout/strings.hpp>
+#include "authorizer/authorizer.hpp"
+
#include "common/protobuf_utils.hpp"
#include "local.hpp"
@@ -67,6 +71,7 @@ using mesos::internal::master::Repairer;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Slave;
+using process::Owned;
using process::PID;
using process::UPID;
@@ -92,6 +97,7 @@ static Master* master = NULL;
static map<Containerizer*, Slave*> slaves;
static StandaloneMasterDetector* detector = NULL;
static MasterContender* contender = NULL;
+static Option<Authorizer*> authorizer = None();
static Files* files = NULL;
@@ -153,15 +159,29 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
contender = new StandaloneMasterContender();
detector = new StandaloneMasterDetector();
- master =
- new Master(
+
+ if (flags.acls.isSome()) {
+ Try<Owned<Authorizer> > authorizer_ =
+ Authorizer::create(flags.acls.get());
+
+ if (authorizer_.isError()) {
+ EXIT(1) << "Failed to initialize the authorizer: "
+ << authorizer_.error() << " (see --acls flag)";
+ }
+ Owned<Authorizer> authorizer__ = authorizer_.get();
+ authorizer = authorizer__.release();
+ }
+
+ master = new Master(
_allocator,
registrar,
repairer,
files,
contender,
detector,
+ authorizer,
flags);
+
detector->appoint(master->info());
}
@@ -222,6 +242,11 @@ void shutdown()
slaves.clear();
+ if (authorizer.isSome()) {
+ delete authorizer.get();
+ authorizer = None();
+ }
+
delete detector;
detector = NULL;
http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 8ceaae6..68cd56b 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -22,18 +22,22 @@
#include <mesos/mesos.hpp>
+#include <process/owned.hpp>
#include <process/pid.hpp>
#include <stout/check.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
#include <stout/nothing.hpp>
+#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
+#include "authorizer/authorizer.hpp"
+
#include "common/build.hpp"
#include "common/protobuf_utils.hpp"
@@ -64,6 +68,7 @@ using namespace zookeeper;
using mesos::MasterInfo;
+using process::Owned;
using process::UPID;
using std::cerr;
@@ -243,6 +248,17 @@ int main(int argc, char** argv)
}
detector = detector_.get();
+ Option<Authorizer*> authorizer = None();
+ if (flags.acls.isSome()) {
+ Try<Owned<Authorizer> > authorizer_ = Authorizer::create(flags.acls.get());
+ if (authorizer_.isError()) {
+ EXIT(1) << "Failed to initialize the authorizer: "
+ << authorizer_.error() << " (see --acls flag)";
+ }
+ Owned<Authorizer> authorizer__ = authorizer_.get();
+ authorizer = authorizer__.release();
+ }
+
LOG(INFO) << "Starting Mesos master";
Master* master =
@@ -253,6 +269,7 @@ int main(int argc, char** argv)
&files,
contender,
detector,
+ authorizer,
flags);
if (zk.isNone()) {
@@ -277,5 +294,9 @@ int main(int argc, char** argv)
delete contender;
delete detector;
+ if (authorizer.isSome()) {
+ delete authorizer.get();
+ }
+
return 0;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c18ccc4..7884aa4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -28,6 +28,7 @@
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/id.hpp>
+#include <process/owned.hpp>
#include <process/run.hpp>
#include <process/metrics/metrics.hpp>
@@ -38,6 +39,7 @@
#include <stout/multihashmap.hpp>
#include <stout/nothing.hpp>
#include <stout/numify.hpp>
+#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
@@ -216,6 +218,7 @@ Master::Master(
Files* _files,
MasterContender* _contender,
MasterDetector* _detector,
+ const Option<Authorizer*>& _authorizer,
const Flags& _flags)
: ProcessBase("master"),
http(*this),
@@ -226,6 +229,7 @@ Master::Master(
files(_files),
contender(_contender),
detector(_detector),
+ authorizer(_authorizer),
metrics(*this),
electedTime(None())
{
@@ -336,14 +340,8 @@ void Master::initialize()
<< " (see --credentials flag)";
}
- if (flags.acls.isSome()) {
- LOG(INFO) << "Master enabling authorization";
- Try<Owned<Authorizer> > authorizer_ = Authorizer::create(flags.acls.get());
- if (authorizer_.isError()) {
- EXIT(1) << "Failed to initialize the Authorizer: "
- << authorizer_.error() << " (see --acls flag)";
- }
- authorizer = authorizer_.get();
+ if (authorizer.isSome()) {
+ LOG(INFO) << "Authorization enabled";
}
hashmap<string, RoleInfo> roleInfos;
http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 26af113..75f0d49 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -100,6 +100,7 @@ public:
Files* files,
MasterContender* contender,
MasterDetector* detector,
+ const Option<Authorizer*>& authorizer,
const Flags& flags = Flags());
virtual ~Master();
@@ -401,6 +402,8 @@ private:
MasterContender* contender;
MasterDetector* detector;
+ const Option<Authorizer*> authorizer;
+
MasterInfo info_;
// Indicates when recovery is complete. Recovery begins once the
@@ -468,8 +471,6 @@ private:
// Principals of authenticated frameworks/slaves keyed by PID.
hashmap<process::UPID, std::string> authenticated;
- Option<process::Owned<Authorizer> > authorizer;
-
int64_t nextFrameworkId; // Used to give each framework a unique ID.
int64_t nextOfferId; // Used to give each slot offer a unique ID.
int64_t nextSlaveId; // Used to give each slave a unique ID.
http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index f4cc9a6..449165c 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -45,6 +45,8 @@
#include "linux/cgroups.hpp"
#endif // __linux__
+#include "authorizer/authorizer.hpp"
+
#include "log/log.hpp"
#include "log/tool/initialize.hpp"
@@ -131,7 +133,8 @@ public:
registrar(NULL),
repairer(NULL),
contender(NULL),
- detector(NULL) {}
+ detector(NULL),
+ authorizer(None()) {}
master::Master* master;
master::allocator::Allocator* allocator;
@@ -143,6 +146,7 @@ public:
master::Repairer* repairer;
MasterContender* contender;
MasterDetector* detector;
+ Option<Authorizer*> authorizer;
};
std::map<process::PID<master::Master>, Master> masters;
@@ -350,6 +354,17 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
master.detector = new StandaloneMasterDetector();
}
+ if (flags.acls.isSome()) {
+ Try<process::Owned<Authorizer> > authorizer_ =
+ Authorizer::create(flags.acls.get());
+ if (authorizer_.isError()) {
+ return Error("Failed to initialize the authorizer: " +
+ authorizer_.error() + " (see --acls flag)");
+ }
+ process::Owned<Authorizer> authorizer__ = authorizer_.get();
+ master.authorizer = authorizer__.release();
+ }
+
master.master = new master::Master(
master.allocator,
master.registrar,
@@ -357,6 +372,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
&cluster->files,
master.contender,
master.detector,
+ master.authorizer,
flags);
if (url.isNone()) {
@@ -408,6 +424,10 @@ inline Try<Nothing> Cluster::Masters::stop(
delete master.contender;
delete master.detector;
+ if (master.authorizer.isSome()) {
+ delete master.authorizer.get();
+ }
+
masters.erase(pid);
return Nothing();
[2/7] git commit: Authorized launch tasks.
Posted by vi...@apache.org.
Authorized launch tasks.
Review: https://reviews.apache.org/r/22151
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a6c4ee70
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a6c4ee70
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a6c4ee70
Branch: refs/heads/vinod/authorize_tasks
Commit: a6c4ee70262380c3ecb3c43c8eee0108f6b224eb
Parents: b73ea62
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed May 28 16:45:22 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/common/protobuf_utils.hpp | 2 +
src/master/hierarchical_allocator_process.hpp | 1 -
src/master/master.cpp | 474 +++++++++++++++------
src/master/master.hpp | 36 +-
src/tests/master_authorization_tests.cpp | 209 +++++++++
6 files changed, 575 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 1d49dca..c91b438 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -975,6 +975,7 @@ mesos_tests_SOURCES = \
tests/log_tests.cpp \
tests/logging_tests.cpp \
tests/main.cpp \
+ tests/master_authorization_tests.cpp \
tests/master_contender_detector_tests.cpp \
tests/master_tests.cpp \
tests/mesos.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 0f65341..12ff00a 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -48,6 +48,8 @@ inline bool isTerminalState(const TaskState& state)
}
+// TODO(vinod): Make SlaveID optional because 'StatusUpdate.SlaveID'
+// is optional.
inline StatusUpdate createStatusUpdate(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 0c5e2e0..1765e70 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -553,7 +553,6 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesUnused(
// result of a valid task launch by an active
// framework that doesn't use the entire offer.
CHECK(frameworks.contains(frameworkId));
-
const std::string& role = frameworks[frameworkId].role();
sorters[role]->unallocated(frameworkId.value(), resources);
sorters[role]->remove(resources);
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index df75c8a..1a2b219 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -25,6 +25,8 @@
#include <list>
#include <sstream>
+#include <process/check.hpp>
+#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/id.hpp>
@@ -68,6 +70,7 @@ using std::list;
using std::string;
using std::vector;
+using process::await;
using process::wait; // Necessary on some OS's to disambiguate.
using process::Clock;
using process::Failure;
@@ -576,6 +579,9 @@ void Master::finalize()
// allocator or the roles because it is unnecessary bookkeeping at
// this point since we are shutting down.
foreachvalue (Framework* framework, frameworks.activated) {
+ // Remove pending tasks from the framework.
+ framework->pendingTasks.clear();
+
// Remove pointers to the framework's tasks in slaves.
foreachvalue (Task* task, utils::copy(framework->tasks)) {
Slave* slave = getSlave(task->slave_id());
@@ -1387,6 +1393,7 @@ struct TaskInfoVisitor
virtual ~TaskInfoVisitor() {}
};
+
// Checks that a task id is valid, i.e., contains only valid characters.
struct TaskIDChecker : TaskInfoVisitor
{
@@ -1445,22 +1452,21 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
{
const TaskID& taskId = task.task_id();
- if (ids.contains(taskId) || framework.tasks.contains(taskId)) {
+ if (framework.pendingTasks.contains(taskId) ||
+ framework.tasks.contains(taskId)) {
return "Task has duplicate ID: " + taskId.value();
}
-
- ids.insert(taskId);
-
return None();
}
-
- hashset<TaskID> ids;
};
-// Checks that the used resources by a task (and executor if
-// necessary) on each slave does not exceed the total resources
-// offered on that slave
+// Checks that the used resources by a task on each slave does not
+// exceed the total resources offered on that slave.
+// NOTE: We do not account for executor resources here because tasks
+// are launched asynchronously and an executor might exit between
+// validation and actual launch. Therefore executor resources are
+// accounted for in 'Master::_launchTasks()'.
struct ResourceUsageChecker : TaskInfoVisitor
{
virtual Option<Error> operator () (
@@ -1482,11 +1488,10 @@ struct ResourceUsageChecker : TaskInfoVisitor
// Check if this task uses more resources than offered.
Resources taskResources = task.resources();
- if (!((usedResources + taskResources) <= resources)) {
+ if (!(taskResources <= resources)) {
return "Task " + stringify(task.task_id()) + " attempted to use " +
- stringify(taskResources) + " combined with already used " +
- stringify(usedResources) + " is greater than offered " +
- stringify(resources);
+ stringify(taskResources) + " which is greater than offered " +
+ stringify(resources);
}
// Check this task's executor's resources.
@@ -1496,33 +1501,13 @@ struct ResourceUsageChecker : TaskInfoVisitor
if (!Resources::isAllocatable(resource)) {
// TODO(benh): Send back the invalid resources?
return "Executor for task " + stringify(task.task_id()) +
- " uses invalid resources " + stringify(resource);
- }
- }
-
- // Check if this task's executor is running, and if not check if
- // the task + the executor use more resources than offered.
- if (!executors.contains(task.executor().executor_id())) {
- if (!slave.hasExecutor(framework.id, task.executor().executor_id())) {
- taskResources += task.executor().resources();
- if (!((usedResources + taskResources) <= resources)) {
- return "Task " + stringify(task.task_id()) +
- " + executor attempted to use " + stringify(taskResources) +
- " combined with already used " + stringify(usedResources) +
- " is greater than offered " + stringify(resources);
- }
+ " uses invalid resources " + stringify(resource);
}
- executors.insert(task.executor().executor_id());
}
}
- usedResources += taskResources;
-
return None();
}
-
- Resources usedResources;
- hashset<ExecutorID> executors;
};
@@ -1544,11 +1529,26 @@ struct ExecutorInfoChecker : TaskInfoVisitor
if (task.has_executor()) {
const ExecutorID& executorId = task.executor().executor_id();
+ Option<ExecutorInfo> executorInfo = None();
+
if (slave.hasExecutor(framework.id, executorId)) {
- const Option<ExecutorInfo> executorInfo =
- slave.executors.get(framework.id).get().get(executorId);
+ executorInfo = slave.executors.get(framework.id).get().get(executorId);
+ } else {
+ // See if any of the pending tasks have the same executor.
+ // Note that picking the first matching executor is ok because
+ // all the matching executors have been added to
+ // 'framework.pendingTasks' after validation and hence have
+ // the same executor info.
+ foreachvalue (const TaskInfo& task_, framework.pendingTasks) {
+ if (task_.has_executor() &&
+ task_.executor().executor_id() == executorId) {
+ executorInfo = task_.executor();
+ break;
+ }
+ }
+ }
- if (!(task.executor() == executorInfo.get())) {
+ if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
return "Task has invalid ExecutorInfo (existing ExecutorInfo"
" with same ExecutorID is not compatible).\n"
"------------------------------------------------------------\n"
@@ -1558,7 +1558,6 @@ struct ExecutorInfoChecker : TaskInfoVisitor
"Task's ExecutorInfo:\n" +
stringify(task.executor()) + "\n"
"------------------------------------------------------------\n";
- }
}
}
@@ -1854,9 +1853,55 @@ void Master::launchTasks(
<< " on slave " << *slave
<< " for framework " << framework->id;
- Resources usedResources; // Accumulated resources used.
+ // Validate each task and launch if valid.
+ list<Future<Option<Error> > > futures;
+ foreach (const TaskInfo& task, tasks) {
+ futures.push_back(validateTask(task, framework, slave, totalResources));
+
+ // Add to pending tasks.
+ // NOTE: We need to do this here after validation because of the
+ // way task validators work.
+ framework->pendingTasks[task.task_id()] = task;
+ }
+
+ // Wait for all the tasks to be validated.
+ // NOTE: We wait for all tasks because currently the allocator
+ // is expected to get 'resourcesUnused()' once per 'launchTasks()'.
+ await(futures)
+ .onAny(defer(self(),
+ &Master::_launchTasks,
+ framework->id,
+ slaveId.get(),
+ tasks,
+ totalResources,
+ filters,
+ lambda::_1));
+}
+
+
+// Helper to convert authorization result to Future<Option<Error> >.
+static Future<Option<Error> > _authorize(const string& message, bool authorized)
+{
+ if (authorized) {
+ return None();
+ }
+
+ return Error(message);
+}
+
+
+Future<Option<Error> > Master::validateTask(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave,
+ const Resources& totalResources)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
// Create task visitors.
+ // TODO(vinod): Create the visitors on the heap and make the visit
+ // operation const.
list<TaskInfoVisitor*> taskVisitors;
taskVisitors.push_back(new TaskIDChecker());
taskVisitors.push_back(new SlaveIDChecker());
@@ -1867,40 +1912,241 @@ void Master::launchTasks(
// TODO(benh): Add a HealthCheckChecker visitor.
- // Loop through each task and check it's validity.
- foreach (const TaskInfo& task, tasks) {
- // Possible error found while checking task's validity.
- Option<Error> error = None();
+ // Invoke each visitor.
+ Option<Error> error = None();
+ foreach (TaskInfoVisitor* visitor, taskVisitors) {
+ error = (*visitor)(task, totalResources, *framework, *slave);
+ if (error.isSome()) {
+ break;
+ }
+ }
- // Invoke each visitor.
- foreach (TaskInfoVisitor* visitor, taskVisitors) {
- error = (*visitor)(task, totalResources, *framework, *slave);
- if (error.isSome()) {
- break;
- }
+ // Cleanup visitors.
+ while (!taskVisitors.empty()) {
+ TaskInfoVisitor* visitor = taskVisitors.front();
+ taskVisitors.pop_front();
+ delete visitor;
+ };
+
+ if (error.isSome()) {
+ return Error(error.get().message);
+ }
+
+ if (authorizer.isNone()) {
+ // Authorization is disabled.
+ return None();
+ }
+
+ // Authorize the task.
+ string user = framework->info.user(); // Default user.
+ if (task.has_command() && task.command().has_user()) {
+ user = task.command().user();
+ } else if (task.has_executor() && task.executor().command().has_user()) {
+ user = task.executor().command().user();
+ }
+
+ LOG(INFO)
+ << "Authorizing framework principal '" << framework->info.principal()
+ << "' to launch task " << task.task_id() << " as user '" << user << "'";
+
+ mesos::ACL::RunTasks request;
+ if (framework->info.has_principal()) {
+ request.mutable_principals()->add_values(framework->info.principal());
+ } else {
+ // Framework doesn't have a principal set.
+ request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+ }
+ request.mutable_users()->add_values(user);
+
+ return authorizer.get()->authorize(request).then(
+ lambda::bind(&_authorize,
+ "Not authorized to launch as user '" + user + "'",
+ lambda::_1));
+}
+
+
+void Master::launchTask(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+ CHECK(!slave->disconnected);
+
+ // Determine if this task launches an executor, and if so make sure
+ // the slave and framework state has been updated accordingly.
+ Option<ExecutorID> executorId;
+
+ if (task.has_executor()) {
+ // TODO(benh): Refactor this code into Slave::addTask.
+ if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
+ CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
+ << "Executor " << task.executor().executor_id()
+ << " known to the framework " << framework->id
+ << " but unknown to the slave " << *slave;
+
+ slave->addExecutor(framework->id, task.executor());
+ framework->addExecutor(slave->id, task.executor());
}
- if (error.isNone()) {
- // Task looks good, get it running!
- usedResources += launchTask(task, framework, slave);
- } else {
- // Error validating task, send a failed status update.
- LOG(WARNING) << "Failed to validate task " << task.task_id()
- << " : " << error.get().message;
+ executorId = task.executor().executor_id();
+ }
+
+ // Add the task to the framework and slave.
+ Task* t = new Task();
+ t->mutable_framework_id()->MergeFrom(framework->id);
+ t->set_state(TASK_STAGING);
+ t->set_name(task.name());
+ t->mutable_task_id()->MergeFrom(task.task_id());
+ t->mutable_slave_id()->MergeFrom(task.slave_id());
+ t->mutable_resources()->MergeFrom(task.resources());
+
+ if (executorId.isSome()) {
+ t->mutable_executor_id()->MergeFrom(executorId.get());
+ }
+
+ framework->addTask(t);
+
+ slave->addTask(t);
+
+ // Tell the slave to launch the task!
+ LOG(INFO) << "Launching task " << task.task_id()
+ << " of framework " << framework->id
+ << " with resources " << task.resources()
+ << " on slave " << *slave;
+
+ RunTaskMessage message;
+ message.mutable_framework()->MergeFrom(framework->info);
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.set_pid(framework->pid);
+ message.mutable_task()->MergeFrom(task);
+ send(slave->pid, message);
+
+ stats.tasks[TASK_STAGING]++;
+ return;
+}
+
+
+void Master::_launchTasks(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const vector<TaskInfo>& tasks,
+ const Resources& totalResources,
+ const Filters& filters,
+ const Future<list<Future<Option<Error> > > >& validationErrors)
+{
+ CHECK_READY(validationErrors);
+ CHECK_EQ(validationErrors.get().size(), tasks.size());
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ LOG(WARNING)
+ << "Ignoring launch tasks message for framework " << frameworkId
+ << " because the framework cannot be found";
+
+ // Tell the allocator about the recovered resources.
+ allocator->resourcesRecovered(frameworkId, slaveId, totalResources);
+
+ return;
+ }
+
+ Slave* slave = getSlave(slaveId);
+ if (slave == NULL || slave->disconnected) {
+ foreach (const TaskInfo& task, tasks) {
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id,
- slave->id,
+ task.slave_id(),
+ task.task_id(),
+ TASK_LOST,
+ (slave == NULL ? "Slave removed" : "Slave disconnected"));
+
+ LOG(INFO) << "Sending status update " << update << ": "
+ << (slave == NULL ? "Slave removed" : "Slave disconnected");
+
+ StatusUpdateMessage message;
+ message.mutable_update()->CopyFrom(update);
+ send(framework->pid, message);
+ }
+
+ // Tell the allocator about the recovered resources.
+ allocator->resourcesRecovered(frameworkId, slaveId, totalResources);
+
+ return;
+ }
+
+ Resources usedResources; // Accumulated resources used.
+
+ size_t index = 0;
+ foreach (const Future<Option<Error> >& future, validationErrors.get()) {
+ const TaskInfo& task = tasks[index++];
+
+ // NOTE: The task will not be in 'pendingTasks' if 'killTask()'
+ // for the task was called before we are here.
+ if (!framework->pendingTasks.contains(task.task_id())) {
+ continue;
+ }
+
+ framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks.
+
+ CHECK(!future.isDiscarded());
+ if (future.isFailed() || future.get().isSome()) {
+ const string error = future.isFailed()
+ ? "Authorization failure: " + future.failure()
+ : future.get().get().message;
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ framework->id,
+ task.slave_id(),
+ task.task_id(),
+ TASK_LOST,
+ error);
+
+ LOG(INFO)
+ << "Sending status update " << update << ": " << error;
+
+ StatusUpdateMessage message;
+ message.mutable_update()->CopyFrom(update);
+ send(framework->pid, message);
+
+ continue;
+ }
+
+ // Check if resources needed by the task (and its executor in case
+ // the executor is new) are available. These resources will be
+ // added by 'launchTask()' below.
+ Resources resources = task.resources();
+ if (task.has_executor() &&
+ !slave->hasExecutor(framework->id, task.executor().executor_id())) {
+ resources += task.executor().resources();
+ }
+
+ if (!(usedResources + resources <= totalResources)) {
+ const string error =
+ "Task uses more resources " + stringify(resources) +
+ " than available " + stringify(totalResources - usedResources);
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ framework->id,
+ task.slave_id(),
task.task_id(),
TASK_LOST,
- error.get().message);
+ error);
+
+ LOG(INFO) << "Sending status update " << update << " for invalid task: "
+ << error;
- LOG(INFO) << "Sending status update "
- << update << " for invalid task";
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update);
send(framework->pid, message);
+
+ continue;
}
+
+ // Launch task.
+ launchTask(task, framework, slave);
+ usedResources += resources;
}
// All used resources should be allocatable, enforced by our validators.
@@ -1911,19 +2157,8 @@ void Master::launchTasks(
if (unusedResources.allocatable().size() > 0) {
// Tell the allocator about the unused (e.g., refused) resources.
- allocator->resourcesUnused(
- framework->id,
- slave->id,
- unusedResources,
- filters);
+ allocator->resourcesUnused(frameworkId, slaveId, unusedResources, filters);
}
-
- // Cleanup visitors.
- while (!taskVisitors.empty()) {
- TaskInfoVisitor* visitor = taskVisitors.front();
- taskVisitors.pop_front();
- delete visitor;
- };
}
@@ -1981,6 +2216,24 @@ void Master::killTask(
return;
}
+ if (framework->pendingTasks.contains(taskId)) {
+ // Remove from pending tasks.
+ framework->pendingTasks.erase(taskId);
+
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus* status = update->mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->set_state(TASK_KILLED);
+ status->set_message("Killed pending task");
+ update->set_timestamp(Clock::now().secs());
+ update->set_uuid(UUID::random().toBytes());
+ send(framework->pid, message);
+
+ return;
+ }
+
Task* task = framework->getTask(taskId);
if (task == NULL) {
// TODO(bmahler): Per MESOS-1200, if we knew the SlaveID here we
@@ -2778,6 +3031,13 @@ void Master::reconcileTasks(
continue;
}
+ if (framework->pendingTasks.contains(status.task_id())) {
+ LOG(WARNING) << "Status for task " << status.task_id()
+ << " from framework " << frameworkId
+ << " is unknown since the task is pending";
+ continue;
+ }
+
Option<StatusUpdate> update;
// Check for a removed slave (case 1).
@@ -3052,73 +3312,6 @@ vector<Framework*> Master::getActiveFrameworks() const
}
-Resources Master::launchTask(const TaskInfo& task,
- Framework* framework,
- Slave* slave)
-{
- CHECK_NOTNULL(framework);
- CHECK_NOTNULL(slave);
-
- Resources resources; // Total resources used on slave by launching this task.
-
- // Determine if this task launches an executor, and if so make sure
- // the slave and framework state has been updated accordingly.
- Option<ExecutorID> executorId;
-
- if (task.has_executor()) {
- // TODO(benh): Refactor this code into Slave::addTask.
- if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
- CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
- << "Executor " << task.executor().executor_id()
- << " known to the framework " << framework->id
- << " but unknown to the slave " << *slave;
-
- slave->addExecutor(framework->id, task.executor());
- framework->addExecutor(slave->id, task.executor());
- resources += task.executor().resources();
- }
-
- executorId = task.executor().executor_id();
- }
-
- // Add the task to the framework and slave.
- Task* t = new Task();
- t->mutable_framework_id()->MergeFrom(framework->id);
- t->set_state(TASK_STAGING);
- t->set_name(task.name());
- t->mutable_task_id()->MergeFrom(task.task_id());
- t->mutable_slave_id()->MergeFrom(task.slave_id());
- t->mutable_resources()->MergeFrom(task.resources());
-
- if (executorId.isSome()) {
- t->mutable_executor_id()->MergeFrom(executorId.get());
- }
-
- framework->addTask(t);
-
- slave->addTask(t);
-
- resources += task.resources();
-
- // Tell the slave to launch the task!
- LOG(INFO) << "Launching task " << task.task_id()
- << " of framework " << framework->id
- << " with resources " << task.resources()
- << " on slave " << *slave;
-
- RunTaskMessage message;
- message.mutable_framework()->MergeFrom(framework->info);
- message.mutable_framework_id()->MergeFrom(framework->id);
- message.set_pid(framework->pid);
- message.mutable_task()->MergeFrom(task);
- send(slave->pid, message);
-
- stats.tasks[TASK_STAGING]++;
-
- return resources;
-}
-
-
// NOTE: This function is only called when the slave re-registers
// with a master that already knows about it (i.e., not a failed
// over master).
@@ -3350,6 +3543,9 @@ void Master::removeFramework(Framework* framework)
send(slave->pid, message);
}
+ // Remove the pending tasks from the framework.
+ framework->pendingTasks.clear();
+
// Remove pointers to the framework's tasks in slaves.
foreachvalue (Task* task, utils::copy(framework->tasks)) {
Slave* slave = getSlave(task->slave_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 75f0d49..0c68a5b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -86,10 +86,10 @@ class SlaveObserver;
class WhitelistWatcher;
struct Framework;
-struct Slave;
-struct Role;
struct OfferVisitor;
-
+struct Role;
+struct Slave;
+struct TaskInfoVisitor;
class Master : public ProtobufProcess<Master>
{
@@ -310,11 +310,27 @@ protected:
const std::vector<StatusUpdate>& updates,
const process::Future<bool>& removed);
- // Launch a task from a task description, and returned the consumed
- // resources for the task and possibly it's executor.
- Resources launchTask(const TaskInfo& task,
- Framework* framework,
- Slave* slave);
+ // Validates the task including authorization.
+ // Returns None if the task is valid.
+ // Returns Error if the task is invalid.
+ // Returns Failure if authorization returns 'Failure'.
+ process::Future<Option<Error> > validateTask(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave,
+ const Resources& totalResources);
+
+ // Launch a task from a task description.
+ void launchTask(const TaskInfo& task, Framework* framework, Slave* slave);
+
+ // 'launchTasks()' continuation.
+ void _launchTasks(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<TaskInfo>& tasks,
+ const Resources& totalResources,
+ const Filters& filters,
+ const process::Future<std::list<process::Future<Option<Error> > > >& f);
// Remove a task.
void removeTask(Task* task);
@@ -901,6 +917,10 @@ struct Framework
process::Time reregisteredTime;
process::Time unregisteredTime;
+ // Tasks that have not yet been launched because they are being
+ // validated (e.g., authorized).
+ hashmap<TaskID, TaskInfo> pendingTasks;
+
hashmap<TaskID, Task*> tasks;
// NOTE: We use a shared pointer for Task because clang doesn't like
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
new file mode 100644
index 0000000..17debaf
--- /dev/null
+++ b/src/tests/master_authorization_tests.cpp
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/try.hpp>
+
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+
+
+class MasterAuthorizationTest : public MesosTest {};
+
+
+// This test verifies that an authorized task launch is successful.
+TEST_F(MasterAuthorizationTest, AuthorizedTask)
+{
+ // Setup ACLs so that the framework can only launch tasks as "foo".
+ ACLs acls;
+ acls.set_permissive(false);
+ mesos::ACL::RunTasks* acl = acls.add_run_tasks();
+ acl->mutable_principals()->add_values(DEFAULT_FRAMEWORK_INFO.principal());
+ acl->mutable_users()->add_values("foo");
+
+ master::Flags flags = CreateMasterFlags();
+ flags.acls = JSON::Protobuf(acls);
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ // Create an authorized executor.
+ ExecutorInfo executor; // Bug in gcc 4.1.*, must assign on next line.
+ executor = CREATE_EXECUTOR_INFO("test-executor", "exit 1");
+ executor.mutable_command()->set_user("foo");
+
+ MockExecutor exec(executor.executor_id());
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // Create an authorized task.
+ TaskInfo task;
+ task.set_name("test");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(executor);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that an unauthorized task launch is rejected.
+TEST_F(MasterAuthorizationTest, UnauthorizedTask)
+{
+ // Setup ACLs so that no framework can launch as "foo".
+ ACLs acls;
+ mesos::ACL::RunTasks* acl = acls.add_run_tasks();
+ acl->mutable_principals()->set_type(mesos::ACL::Entity::NONE);
+ acl->mutable_users()->add_values("foo");
+
+ master::Flags flags = CreateMasterFlags();
+ flags.acls = JSON::Protobuf(acls);
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ // Create an unauthorized executor.
+ ExecutorInfo executor; // Bug in gcc 4.1.*, must assign on next line.
+ executor = CREATE_EXECUTOR_INFO("test-executor", "exit 1");
+ executor.mutable_command()->set_user("foo");
+
+ MockExecutor exec(executor.executor_id());
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // Create an unauthorized task.
+ TaskInfo task;
+ task.set_name("test");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(executor);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_LOST, status.get().state());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
[3/7] git commit: Split Authorizer into source and header files.
Posted by vi...@apache.org.
Split Authorizer into source and header files.
Review: https://reviews.apache.org/r/22149
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6267a0f0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6267a0f0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6267a0f0
Branch: refs/heads/vinod/authorize_tasks
Commit: 6267a0f011ecaa8e0ecd151c4d02e5285b680931
Parents: dd94a1f
Author: Vinod Kone <vi...@twitter.com>
Authored: Sat May 31 19:03:08 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 6 +-
src/authorizer/authorizer.cpp | 333 +++++++++++++++++++++++++++++++++++++
src/authorizer/authorizer.hpp | 310 +---------------------------------
3 files changed, 338 insertions(+), 311 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6267a0f0/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 4a3f2e1..1d49dca 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -210,7 +210,7 @@ nodist_libmesos_no_3rdparty_la_SOURCES = \
$(REGISTRY_PROTOS)
libmesos_no_3rdparty_la_SOURCES = \
- authorizer/authorizer.hpp \
+ authorizer/authorizer.cpp \
sasl/authenticatee.hpp \
sasl/authenticator.hpp \
sasl/auxprop.hpp \
@@ -325,7 +325,9 @@ if WITH_NETWORK_ISOLATOR
linux/routing/queueing/internal.hpp
endif
-libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp \
+libmesos_no_3rdparty_la_SOURCES += \
+ authorizer/authorizer.hpp \
+ common/attributes.hpp \
common/build.hpp common/date_utils.hpp common/factory.hpp \
common/protobuf_utils.hpp \
common/http.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/6267a0f0/src/authorizer/authorizer.cpp
----------------------------------------------------------------------
diff --git a/src/authorizer/authorizer.cpp b/src/authorizer/authorizer.cpp
new file mode 100644
index 0000000..c46b31d
--- /dev/null
+++ b/src/authorizer/authorizer.cpp
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/check.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/try.hpp>
+
+#include "authorizer/authorizer.hpp"
+
+#include "mesos/mesos.hpp"
+
+using process::Future;
+using process::Owned;
+using process::dispatch;
+
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+
+class LocalAuthorizerProcess : public ProtobufProcess<LocalAuthorizerProcess>
+{
+public:
+ LocalAuthorizerProcess(const ACLs& _acls)
+ : ProcessBase(process::ID::generate("authorizer")), acls(_acls) {}
+
+ Future<bool> authorize(const ACL::RunTasks& request)
+ {
+ foreach (const ACL::RunTasks& acl, acls.run_tasks()) {
+ // ACL matches if both subjects and objects match.
+ if (matches(request.principals(), acl.principals()) &&
+ matches(request.users(), acl.users())) {
+ // ACL is allowed if both subjects and objects are allowed.
+ return allows(request.principals(), acl.principals()) &&
+ allows(request.users(), acl.users());
+ }
+ }
+
+ return acls.permissive(); // None of the ACLs match.
+ }
+
+ Future<bool> authorize(const ACL::ReceiveOffers& request)
+ {
+ foreach (const ACL::ReceiveOffers& acl, acls.receive_offers()) {
+ // ACL matches if both subjects and objects match.
+ if (matches(request.principals(), acl.principals()) &&
+ matches(request.roles(), acl.roles())) {
+ // ACL is allowed if both subjects and objects are allowed.
+ return allows(request.principals(), acl.principals()) &&
+ allows(request.roles(), acl.roles());
+ }
+ }
+
+ return acls.permissive(); // None of the ACLs match.
+ }
+
+ Future<bool> authorize(const ACL::HTTPGet& request)
+ {
+ foreach (const ACL::HTTPGet& acl, acls.http_get()) {
+ // ACL matches if both subjects and objects match.
+ if (matches(request.usernames(), acl.usernames()) &&
+ matches(request.ips(), acl.ips()) &&
+ matches(request.hostnames(), acl.hostnames()) &&
+ matches(request.urls(), acl.urls())) {
+ // ACL is allowed if both subjects and objects are allowed.
+ return allows(request.usernames(), acl.usernames()) &&
+ allows(request.ips(), acl.ips()) &&
+ allows(request.hostnames(), acl.hostnames()) &&
+ allows(request.urls(), acl.urls());
+ }
+ }
+
+ return acls.permissive(); // None of the ACLs match.
+ }
+
+ Future<bool> authorize(const ACL::HTTPPut& request)
+ {
+ foreach (const ACL::HTTPPut& acl, acls.http_put()) {
+ // ACL matches if both subjects and objects match.
+ if (matches(request.usernames(), acl.usernames()) &&
+ matches(request.ips(), acl.ips()) &&
+ matches(request.hostnames(), acl.hostnames()) &&
+ matches(request.urls(), acl.urls())) {
+ // ACL is allowed if both subjects and objects are allowed.
+ return allows(request.usernames(), acl.usernames()) &&
+ allows(request.ips(), acl.ips()) &&
+ allows(request.hostnames(), acl.hostnames()) &&
+ allows(request.urls(), acl.urls());
+ }
+ }
+
+ return acls.permissive(); // None of the ACLs match.
+ }
+private:
+ // Match matrix:
+ //
+ // -----------ACL----------
+ //
+ // SOME NONE ANY
+ // -------|-------|-------|-------
+ // | SOME | Yes/No| Yes | Yes
+ // | -------|-------|-------|-------
+ // Request NONE | No | Yes | No
+ // | -------|-------|-------|-------
+ // | ANY | No | Yes | Yes
+ // -------|-------|-------|-------
+ bool matches(const ACL::Entity& request, const ACL::Entity& acl)
+ {
+ // NONE only matches with NONE.
+ if (request.type() == ACL::Entity::NONE) {
+ return acl.type() == ACL::Entity::NONE;
+ }
+
+ // ANY matches with ANY or NONE.
+ if (request.type() == ACL::Entity::ANY) {
+ return acl.type() == ACL::Entity::ANY || acl.type() == ACL::Entity::NONE;
+ }
+
+ if (request.type() == ACL::Entity::SOME) {
+ // SOME matches with ANY or NONE.
+ if (acl.type() == ACL::Entity::ANY || acl.type() == ACL::Entity::NONE) {
+ return true;
+ }
+
+ // SOME is allowed if the request values are a subset of ACL
+ // values.
+ foreach (const string& value, request.values()) {
+ bool found = false;
+ foreach (const string& value_, acl.values()) {
+ if (value == value_) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ return false;
+ }
+
+ // Allow matrix:
+ //
+ // -----------ACL----------
+ //
+ // SOME NONE ANY
+ // -------|-------|-------|-------
+ // | SOME | Yes/No| No | Yes
+ // | -------|-------|-------|-------
+ // Request NONE | No | Yes | No
+ // | -------|-------|-------|-------
+ // | ANY | No | No | Yes
+ // -------|-------|-------|-------
+ bool allows(const ACL::Entity& request, const ACL::Entity& acl)
+ {
+ // NONE is only allowed by NONE.
+ if (request.type() == ACL::Entity::NONE) {
+ return acl.type() == ACL::Entity::NONE;
+ }
+
+ // ANY is only allowed by ANY.
+ if (request.type() == ACL::Entity::ANY) {
+ return acl.type() == ACL::Entity::ANY;
+ }
+
+ if (request.type() == ACL::Entity::SOME) {
+ // SOME is allowed by ANY.
+ if (acl.type() == ACL::Entity::ANY) {
+ return true;
+ }
+
+ // SOME is not allowed by NONE.
+ if (acl.type() == ACL::Entity::NONE) {
+ return false;
+ }
+
+ // SOME is allowed if the request values are a subset of ACL
+ // values.
+ foreach (const string& value, request.values()) {
+ bool found = false;
+ foreach (const string& value_, acl.values()) {
+ if (value == value_) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ return false;
+ }
+
+ ACLs acls;
+};
+
+
+Try<Owned<Authorizer> > Authorizer::create(const JSON::Object& acls_)
+{
+ // Convert ACLs from JSON to Protobuf.
+ Try<ACLs> acls = protobuf::parse<ACLs>(acls_);
+ if (acls.isError()) {
+ return Error("Invalid ACLs format: " + acls.error());
+ }
+
+ Try<Owned<LocalAuthorizer> > authorizer = LocalAuthorizer::create(acls.get());
+
+ if (authorizer.isError()) {
+ return Error(authorizer.error());
+ }
+
+ Owned<LocalAuthorizer> authorizer_ = authorizer.get();
+ return static_cast<Authorizer*>(authorizer_.release());
+}
+
+
+LocalAuthorizer::LocalAuthorizer(const ACLs& acls)
+{
+ process = new LocalAuthorizerProcess(acls);
+ spawn(process);
+}
+
+
+LocalAuthorizer::~LocalAuthorizer()
+{
+ terminate(process);
+ wait(process);
+ delete process;
+}
+
+
+Try<Owned<LocalAuthorizer> > LocalAuthorizer::create(const ACLs& acls)
+{
+ // Validate ACLs.
+ foreach (const ACL::HTTPGet& acl, acls.http_get()) {
+ // At least one of the subjects should be set.
+ if (acl.has_usernames() + acl.has_ips() + acl.has_hostnames() < 1) {
+ return Error("At least one of the subjects should be set for ACL: " +
+ acl.DebugString());
+ }
+ }
+
+ foreach (const ACL::HTTPPut& acl, acls.http_put()) {
+ // At least one of the subjects should be set.
+ if (acl.has_usernames() + acl.has_ips() + acl.has_hostnames() < 1) {
+ return Error("At least one of the subjects should be set for ACL: " +
+ acl.DebugString());
+ }
+ }
+
+ return new LocalAuthorizer(acls);
+}
+
+
+Future<bool> LocalAuthorizer::authorize(const ACL::RunTasks& request)
+{
+ // Necessary to disambiguate.
+ typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::RunTasks&);
+
+ return dispatch(
+ process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
+
+Future<bool> LocalAuthorizer::authorize(const ACL::ReceiveOffers& request)
+{
+ // Necessary to disambiguate.
+ typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::ReceiveOffers&);
+
+ return dispatch(
+ process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
+
+Future<bool> LocalAuthorizer::authorize(const ACL::HTTPGet& request)
+{
+ // Necessary to disambiguate.
+ typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::HTTPGet&);
+
+ return dispatch(
+ process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
+
+Future<bool> LocalAuthorizer::authorize(const ACL::HTTPPut& request)
+{
+ // Necessary to disambiguate.
+ typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::HTTPPut&);
+
+ return dispatch(
+ process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6267a0f0/src/authorizer/authorizer.hpp
----------------------------------------------------------------------
diff --git a/src/authorizer/authorizer.hpp b/src/authorizer/authorizer.hpp
index b0d1eae..43e1c3b 100644
--- a/src/authorizer/authorizer.hpp
+++ b/src/authorizer/authorizer.hpp
@@ -19,22 +19,13 @@
#ifndef __AUTHORIZER_AUTHORIZER_HPP__
#define __AUTHORIZER_AUTHORIZER_HPP__
-#include <string>
-#include <vector>
-
#include <glog/logging.h>
-#include <process/dispatch.hpp>
#include <process/future.hpp>
-#include <process/id.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
-#include <process/protobuf.hpp>
-#include <stout/check.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/hashset.hpp>
-#include <stout/protobuf.hpp>
+#include <stout/json.hpp>
#include <stout/try.hpp>
#include "mesos/mesos.hpp"
@@ -90,305 +81,6 @@ private:
LocalAuthorizerProcess* process;
};
-
-class LocalAuthorizerProcess : public ProtobufProcess<LocalAuthorizerProcess>
-{
-public:
- LocalAuthorizerProcess(const ACLs& _acls)
- : ProcessBase(process::ID::generate("authorizer")), acls(_acls) {}
-
- process::Future<bool> authorize(const ACL::RunTasks& request)
- {
- foreach (const ACL::RunTasks& acl, acls.run_tasks()) {
- // ACL matches if both subjects and objects match.
- if (matches(request.principals(), acl.principals()) &&
- matches(request.users(), acl.users())) {
- // ACL is allowed if both subjects and objects are allowed.
- return allows(request.principals(), acl.principals()) &&
- allows(request.users(), acl.users());
- }
- }
-
- return acls.permissive(); // None of the ACLs match.
- }
-
- process::Future<bool> authorize(const ACL::ReceiveOffers& request)
- {
- foreach (const ACL::ReceiveOffers& acl, acls.receive_offers()) {
- // ACL matches if both subjects and objects match.
- if (matches(request.principals(), acl.principals()) &&
- matches(request.roles(), acl.roles())) {
- // ACL is allowed if both subjects and objects are allowed.
- return allows(request.principals(), acl.principals()) &&
- allows(request.roles(), acl.roles());
- }
- }
-
- return acls.permissive(); // None of the ACLs match.
- }
-
- process::Future<bool> authorize(const ACL::HTTPGet& request)
- {
- foreach (const ACL::HTTPGet& acl, acls.http_get()) {
- // ACL matches if both subjects and objects match.
- if (matches(request.usernames(), acl.usernames()) &&
- matches(request.ips(), acl.ips()) &&
- matches(request.hostnames(), acl.hostnames()) &&
- matches(request.urls(), acl.urls())) {
- // ACL is allowed if both subjects and objects are allowed.
- return allows(request.usernames(), acl.usernames()) &&
- allows(request.ips(), acl.ips()) &&
- allows(request.hostnames(), acl.hostnames()) &&
- allows(request.urls(), acl.urls());
- }
- }
-
- return acls.permissive(); // None of the ACLs match.
- }
-
- process::Future<bool> authorize(const ACL::HTTPPut& request)
- {
- foreach (const ACL::HTTPPut& acl, acls.http_put()) {
- // ACL matches if both subjects and objects match.
- if (matches(request.usernames(), acl.usernames()) &&
- matches(request.ips(), acl.ips()) &&
- matches(request.hostnames(), acl.hostnames()) &&
- matches(request.urls(), acl.urls())) {
- // ACL is allowed if both subjects and objects are allowed.
- return allows(request.usernames(), acl.usernames()) &&
- allows(request.ips(), acl.ips()) &&
- allows(request.hostnames(), acl.hostnames()) &&
- allows(request.urls(), acl.urls());
- }
- }
-
- return acls.permissive(); // None of the ACLs match.
- }
-private:
- // Match matrix:
- //
- // -----------ACL----------
- //
- // SOME NONE ANY
- // -------|-------|-------|-------
- // | SOME | Yes/No| Yes | Yes
- // | -------|-------|-------|-------
- // Request NONE | No | Yes | No
- // | -------|-------|-------|-------
- // | ANY | No | Yes | Yes
- // -------|-------|-------|-------
- bool matches(const ACL::Entity& request, const ACL::Entity& acl)
- {
- // NONE only matches with NONE.
- if (request.type() == ACL::Entity::NONE) {
- return acl.type() == ACL::Entity::NONE;
- }
-
- // ANY matches with ANY or NONE.
- if (request.type() == ACL::Entity::ANY) {
- return acl.type() == ACL::Entity::ANY || acl.type() == ACL::Entity::NONE;
- }
-
- if (request.type() == ACL::Entity::SOME) {
- // SOME matches with ANY or NONE.
- if (acl.type() == ACL::Entity::ANY || acl.type() == ACL::Entity::NONE) {
- return true;
- }
-
- // SOME is allowed if the request values are a subset of ACL
- // values.
- foreach (const std::string& value, request.values()) {
- bool found = false;
- foreach (const std::string& value_, acl.values()) {
- if (value == value_) {
- found = true;
- break;
- }
- }
-
- if (!found) {
- return false;
- }
- }
- return true;
- }
-
- return false;
- }
-
- // Allow matrix:
- //
- // -----------ACL----------
- //
- // SOME NONE ANY
- // -------|-------|-------|-------
- // | SOME | Yes/No| No | Yes
- // | -------|-------|-------|-------
- // Request NONE | No | Yes | No
- // | -------|-------|-------|-------
- // | ANY | No | No | Yes
- // -------|-------|-------|-------
- bool allows(const ACL::Entity& request, const ACL::Entity& acl)
- {
- // NONE is only allowed by NONE.
- if (request.type() == ACL::Entity::NONE) {
- return acl.type() == ACL::Entity::NONE;
- }
-
- // ANY is only allowed by ANY.
- if (request.type() == ACL::Entity::ANY) {
- return acl.type() == ACL::Entity::ANY;
- }
-
- if (request.type() == ACL::Entity::SOME) {
- // SOME is allowed by ANY.
- if (acl.type() == ACL::Entity::ANY) {
- return true;
- }
-
- // SOME is not allowed by NONE.
- if (acl.type() == ACL::Entity::NONE) {
- return false;
- }
-
- // SOME is allowed if the request values are a subset of ACL
- // values.
- foreach (const std::string& value, request.values()) {
- bool found = false;
- foreach (const std::string& value_, acl.values()) {
- if (value == value_) {
- found = true;
- break;
- }
- }
-
- if (!found) {
- return false;
- }
- }
- return true;
- }
-
- return false;
- }
-
- ACLs acls;
-};
-
-
-Try<process::Owned<Authorizer> > Authorizer::create(const JSON::Object& acls_)
-{
- // Convert ACLs from JSON to Protobuf.
- Try<ACLs> acls = protobuf::parse<ACLs>(acls_);
- if (acls.isError()) {
- return Error("Invalid ACLs format: " + acls.error());
- }
-
- Try<process::Owned<LocalAuthorizer> > authorizer =
- LocalAuthorizer::create(acls.get());
-
- if (authorizer.isError()) {
- return Error(authorizer.error());
- }
-
- process::Owned<LocalAuthorizer> authorizer_ = authorizer.get();
- return static_cast<Authorizer*>(authorizer_.release());
-}
-
-
-LocalAuthorizer::LocalAuthorizer(const ACLs& acls)
-{
- process = new LocalAuthorizerProcess(acls);
- process::spawn(process);
-}
-
-
-LocalAuthorizer::~LocalAuthorizer()
-{
- process::terminate(process);
- process::wait(process);
- delete process;
-}
-
-
-Try<process::Owned<LocalAuthorizer> > LocalAuthorizer::create(const ACLs& acls)
-{
- // Validate ACLs.
- foreach (const ACL::HTTPGet& acl, acls.http_get()) {
- // At least one of the subjects should be set.
- if (acl.has_usernames() + acl.has_ips() + acl.has_hostnames() < 1) {
- return Error("At least one of the subjects should be set for ACL: " +
- acl.DebugString());
- }
- }
-
- foreach (const ACL::HTTPPut& acl, acls.http_put()) {
- // At least one of the subjects should be set.
- if (acl.has_usernames() + acl.has_ips() + acl.has_hostnames() < 1) {
- return Error("At least one of the subjects should be set for ACL: " +
- acl.DebugString());
- }
- }
-
- return new LocalAuthorizer(acls);
-}
-
-
-process::Future<bool> LocalAuthorizer::authorize(
- const ACL::RunTasks& request)
-{
- // Necessary to disambiguate.
- typedef process::Future<bool>(LocalAuthorizerProcess::*F)(
- const ACL::RunTasks&);
-
- return process::dispatch(
- process,
- static_cast<F>(&LocalAuthorizerProcess::authorize),
- request);
-}
-
-
-process::Future<bool> LocalAuthorizer::authorize(
- const ACL::ReceiveOffers& request)
-{
- // Necessary to disambiguate.
- typedef process::Future<bool>(LocalAuthorizerProcess::*F)(
- const ACL::ReceiveOffers&);
-
- return process::dispatch(
- process,
- static_cast<F>(&LocalAuthorizerProcess::authorize),
- request);
-}
-
-
-process::Future<bool> LocalAuthorizer::authorize(
- const ACL::HTTPGet& request)
-{
- // Necessary to disambiguate.
- typedef process::Future<bool>(LocalAuthorizerProcess::*F)(
- const ACL::HTTPGet&);
-
- return process::dispatch(
- process,
- static_cast<F>(&LocalAuthorizerProcess::authorize),
- request);
-}
-
-
-process::Future<bool> LocalAuthorizer::authorize(
- const ACL::HTTPPut& request)
-{
- // Necessary to disambiguate.
- typedef process::Future<bool>(LocalAuthorizerProcess::*F)(
- const ACL::HTTPPut&);
-
- return process::dispatch(
- process,
- static_cast<F>(&LocalAuthorizerProcess::authorize),
- request);
-}
-
} // namespace internal {
} // namespace mesos {
[6/7] git commit: Added MockAuthorizer and more authorization tests.
Posted by vi...@apache.org.
Added MockAuthorizer and more authorization tests.
Review: https://reviews.apache.org/r/22190
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c4084554
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c4084554
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c4084554
Branch: refs/heads/vinod/authorize_tasks
Commit: c4084554e8245397c168d44d0e54a1c2491ca3a8
Parents: a6c4ee7
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Jun 2 22:20:11 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700
----------------------------------------------------------------------
src/tests/cluster.hpp | 44 ++-
src/tests/master_authorization_tests.cpp | 396 +++++++++++++++++++++++++-
src/tests/mesos.cpp | 24 ++
src/tests/mesos.hpp | 46 +++
4 files changed, 502 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c4084554/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 449165c..1c96ee7 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -98,12 +98,25 @@ public:
Try<process::PID<master::Master> > start(
const master::Flags& flags = master::Flags());
+ // Start and manage a new master using the specified allocator
+ // process.
+ Try<process::PID<master::Master> > start(
+ master::allocator::AllocatorProcess* allocatorProcess,
+ const master::Flags& flags = master::Flags());
+
+ // Start and manage a new master using the specified authorizer.
+ Try<process::PID<master::Master> > start(
+ Authorizer* authorizer,
+ const master::Flags& flags = master::Flags());
+
// Start and manage a new master using the specified flags.
- // An allocator process may be specified in which case it will outlive
- // the launched master. If no allocator process is specified then
- // the default allocator will be instantiated.
+ // An allocator process or authorizer may be specified in which
+ // case it will outlive the launched master. If either allocator
+ // process or authorizer is not specified then the default
+ // allocator or authorizer will be used.
Try<process::PID<master::Master> > start(
const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
+ const Option<Authorizer*>& authorizer,
const master::Flags& flags = master::Flags());
// Stops and cleans up a master at the specified PID.
@@ -272,12 +285,29 @@ inline void Cluster::Masters::shutdown()
inline Try<process::PID<master::Master> > Cluster::Masters::start(
const master::Flags& flags)
{
- return start(None(), flags);
+ return start(None(), None(), flags);
+}
+
+
+inline Try<process::PID<master::Master> > Cluster::Masters::start(
+ master::allocator::AllocatorProcess* allocator,
+ const master::Flags& flags)
+{
+ return start(allocator, None(), flags);
+}
+
+
+inline Try<process::PID<master::Master> > Cluster::Masters::start(
+ Authorizer* authorizer,
+ const master::Flags& flags)
+{
+ return start(None(), authorizer, flags);
}
inline Try<process::PID<master::Master> > Cluster::Masters::start(
const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
+ const Option<Authorizer*>& authorizer,
const master::Flags& flags)
{
// Disallow multiple masters when not using ZooKeeper.
@@ -354,7 +384,9 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
master.detector = new StandaloneMasterDetector();
}
- if (flags.acls.isSome()) {
+ if (authorizer.isSome()) {
+ CHECK_NOTNULL(authorizer.get());
+ } else if (flags.acls.isSome()) {
Try<process::Owned<Authorizer> > authorizer_ =
Authorizer::create(flags.acls.get());
if (authorizer_.isError()) {
@@ -372,7 +404,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
&cluster->files,
master.contender,
master.detector,
- master.authorizer,
+ authorizer.isSome() ? authorizer : master.authorizer,
flags);
if (url.isNone()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/c4084554/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index 17debaf..3a28ca2 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -31,6 +31,7 @@
#include <stout/gtest.hpp>
#include <stout/try.hpp>
+#include "master/allocator.hpp"
#include "master/master.hpp"
#include "messages/messages.hpp"
@@ -45,16 +46,21 @@ using namespace mesos::internal::tests;
using mesos::internal::master::Master;
+using mesos::internal::master::allocator::AllocatorProcess;
+
using mesos::internal::slave::Slave;
using process::Clock;
using process::Future;
using process::PID;
+using process::Promise;
using std::vector;
using testing::_;
+using testing::An;
using testing::AtMost;
+using testing::DoAll;
using testing::Return;
@@ -64,9 +70,8 @@ class MasterAuthorizationTest : public MesosTest {};
// This test verifies that an authorized task launch is successful.
TEST_F(MasterAuthorizationTest, AuthorizedTask)
{
- // Setup ACLs so that the framework can only launch tasks as "foo".
+ // Setup ACLs so that the framework can launch tasks as "foo".
ACLs acls;
- acls.set_permissive(false);
mesos::ACL::RunTasks* acl = acls.add_run_tasks();
acl->mutable_principals()->add_values(DEFAULT_FRAMEWORK_INFO.principal());
acl->mutable_users()->add_values("foo");
@@ -207,3 +212,390 @@ TEST_F(MasterAuthorizationTest, UnauthorizedTask)
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
+
+
+// This test verifies that a 'killTask()' that comes before
+// '_launchTasks()' is called results in TASK_KILLED.
+TEST_F(MasterAuthorizationTest, KillTask)
+{
+ MockAuthorizer authorizer;
+ Try<PID<Master> > master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Return a pending future from authorizer.
+ Future<Nothing> future;
+ Promise<bool> promise;
+ EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+ .WillOnce(DoAll(FutureSatisfy(&future),
+ Return(promise.future())));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Wait until authorization is in progress.
+ AWAIT_READY(future);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // Now kill the task.
+ driver.killTask(task.task_id());
+
+ // Framework should get a TASK_KILLED right away.
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_KILLED, status.get().state());
+
+ Future<Nothing> resourcesUnused =
+ FUTURE_DISPATCH(_, &AllocatorProcess::resourcesUnused);
+
+ // Now complete authorization.
+ promise.set(true);
+
+ // No task launch should happen resulting in all resources being
+ // returned to the allocator.
+ AWAIT_READY(resourcesUnused);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that a slave removal that comes before
+// '_launchTasks()' is called results in TASK_LOST.
+TEST_F(MasterAuthorizationTest, SlaveRemoved)
+{
+ MockAuthorizer authorizer;
+ Try<PID<Master> > master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Return a pending future from authorizer.
+ Future<Nothing> future;
+ Promise<bool> promise;
+ EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+ .WillOnce(DoAll(FutureSatisfy(&future),
+ Return(promise.future())));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Wait until authorization is in progress.
+ AWAIT_READY(future);
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ // Now stop the slave.
+ Stop(slave.get());
+
+ AWAIT_READY(slaveLost);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<Nothing> resourcesRecovered =
+ FUTURE_DISPATCH(_, &AllocatorProcess::resourcesRecovered);
+
+ // Now complete authorization.
+ promise.set(true);
+
+ // Framework should get a TASK_LOST.
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_LOST, status.get().state());
+
+ // No task launch should happen resulting in all resources being
+ // returned to the allocator.
+ AWAIT_READY(resourcesRecovered);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that a slave disconnection that comes before
+// '_launchTasks()' is called results in TASK_LOST.
+TEST_F(MasterAuthorizationTest, SlaveDisconnected)
+{
+ MockAuthorizer authorizer;
+ Try<PID<Master> > master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ // Create a checkpointing slave so that a disconnected slave is not
+ // immediately removed.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.checkpoint = true;
+ Try<PID<Slave> > slave = StartSlave(&exec, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Return a pending future from authorizer.
+ Future<Nothing> future;
+ Promise<bool> promise;
+ EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+ .WillOnce(DoAll(FutureSatisfy(&future),
+ Return(promise.future())));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Wait until authorization is in progress.
+ AWAIT_READY(future);
+
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .Times(AtMost(1));
+
+ Future<Nothing> slaveDisconnected =
+ FUTURE_DISPATCH(_, &AllocatorProcess::slaveDisconnected);
+
+ // Now stop the slave.
+ Stop(slave.get());
+
+ AWAIT_READY(slaveDisconnected);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<Nothing> resourcesRecovered =
+ FUTURE_DISPATCH(_, &AllocatorProcess::resourcesRecovered);
+
+ // Now complete authorization.
+ promise.set(true);
+
+ // Framework should get a TASK_LOST.
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_LOST, status.get().state());
+
+ // No task launch should happen resulting in all resources being
+ // returned to the allocator.
+ AWAIT_READY(resourcesRecovered);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that a framework removal that comes before
+// '_launchTasks()' is called results in recovery of resources.
+TEST_F(MasterAuthorizationTest, FrameworkRemoved)
+{
+ MockAuthorizer authorizer;
+ Try<PID<Master> > master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Return a pending future from authorizer.
+ Future<Nothing> future;
+ Promise<bool> promise;
+ EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+ .WillOnce(DoAll(FutureSatisfy(&future),
+ Return(promise.future())));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Wait until authorization is in progress.
+ AWAIT_READY(future);
+
+ Future<Nothing> frameworkRemoved =
+ FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+ // Now stop the framework.
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(frameworkRemoved);
+
+ Future<Nothing> resourcesRecovered =
+ FUTURE_DISPATCH(_, &AllocatorProcess::resourcesRecovered);
+
+ // Now complete authorization.
+ promise.set(true);
+
+ // No task launch should happen resulting in all resources being
+ // returned to the allocator.
+ AWAIT_READY(resourcesRecovered);
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that a reconciliation request that comes before
+// '_launchTasks()' is ignored.
+TEST_F(MasterAuthorizationTest, ReconcileTask)
+{
+ MockAuthorizer authorizer;
+ Try<PID<Master> > master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Return a pending future from authorizer.
+ Future<Nothing> future;
+ Promise<bool> promise;
+ EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+ .WillOnce(DoAll(FutureSatisfy(&future),
+ Return(promise.future())));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Wait until authorization is in progress.
+ AWAIT_READY(future);
+
+ // Scheduler shouldn't get an update from reconciliation.
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .Times(0);
+
+ Future<ReconcileTasksMessage> reconcileTasksMessage =
+ FUTURE_PROTOBUF(ReconcileTasksMessage(), _, _);
+
+ vector<TaskStatus> statuses;
+
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(task.task_id());
+ status.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+ status.set_state(TASK_STAGING);
+
+ statuses.push_back(status);
+
+ driver.reconcileTasks(statuses);
+
+ AWAIT_READY(reconcileTasksMessage);
+
+ // Make sure the framework doesn't receive any update.
+ Clock::pause();
+ Clock::settle();
+
+ // Now stop the framework.
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c4084554/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index ea6a1c0..e6d807c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -27,6 +27,8 @@
#include <stout/stringify.hpp>
#include <stout/uuid.hpp>
+#include "authorizer/authorizer.hpp"
+
#ifdef __linux__
#include "linux/cgroups.hpp"
#endif
@@ -199,6 +201,28 @@ Try<process::PID<master::Master> > MesosTest::StartMaster(
}
+Try<process::PID<master::Master> > MesosTest::StartMaster(
+ Authorizer* authorizer,
+ const Option<master::Flags>& flags,
+ bool wait)
+{
+ Future<Nothing> detected = FUTURE_DISPATCH(_, &master::Master::detected);
+
+ Try<process::PID<master::Master> > master = cluster.masters.start(
+ authorizer, flags.isNone() ? CreateMasterFlags() : flags.get());
+
+ // Wait until the leader is detected because otherwise this master
+ // may reject authentication requests because it doesn't know it's
+ // the leader yet [MESOS-881].
+ if (wait && master.isSome() && !detected.await(Seconds(10))) {
+ return Error("Failed to wait " + stringify(Seconds(10)) +
+ " for master to detect the leader");
+ }
+
+ return master;
+}
+
+
Try<process::PID<slave::Slave> > MesosTest::StartSlave(
const Option<slave::Flags>& flags)
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/c4084554/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 2e01e5e..0b9b2f9 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -41,6 +41,8 @@
#include <stout/try.hpp>
#include <stout/uuid.hpp>
+#include "authorizer/authorizer.hpp"
+
#include "messages/messages.hpp" // For google::protobuf::Message.
#include "master/allocator.hpp"
@@ -101,6 +103,14 @@ protected:
const Option<master::Flags>& flags = None(),
bool wait = true);
+ // Starts a master with the specified authorizer and flags.
+ // Waits for the master to detect a leader (could be itself) before
+ // returning if 'wait' is set to true.
+ virtual Try<process::PID<master::Master> > StartMaster(
+ Authorizer* authorizer,
+ const Option<master::Flags>& flags = None(),
+ bool wait = true);
+
// Starts a slave with the specified flags.
virtual Try<process::PID<slave::Slave> > StartSlave(
const Option<slave::Flags>& flags = None());
@@ -469,6 +479,42 @@ public:
};
+// Definition of a MockAuthozier that can be used in tests with gmock.
+class MockAuthorizer : public Authorizer
+{
+public:
+ MockAuthorizer()
+ {
+ using ::testing::An;
+ using ::testing::Return;
+
+ // NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+ // 'ON_CALL' and 'WillByDefault'. See 'TestContainerizer::SetUp()'
+ // for more details.
+ EXPECT_CALL(*this, authorize(An<const mesos::ACL::RunTasks&>()))
+ .WillRepeatedly(Return(true));
+
+ EXPECT_CALL(*this, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+ .WillRepeatedly(Return(true));
+
+ EXPECT_CALL(*this, authorize(An<const mesos::ACL::HTTPGet&>()))
+ .WillRepeatedly(Return(true));
+
+ EXPECT_CALL(*this, authorize(An<const mesos::ACL::HTTPPut&>()))
+ .WillRepeatedly(Return(true));
+ }
+
+ MOCK_METHOD1(
+ authorize, process::Future<bool>(const ACL::RunTasks& request));
+ MOCK_METHOD1(
+ authorize, process::Future<bool>(const ACL::ReceiveOffers& request));
+ MOCK_METHOD1(
+ authorize, process::Future<bool>(const ACL::HTTPGet& request));
+ MOCK_METHOD1(
+ authorize, process::Future<bool>(const ACL::HTTPPut& request));
+};
+
+
template <typename T = master::allocator::AllocatorProcess>
class MockAllocatorProcess : public master::allocator::AllocatorProcess
{
[4/7] git commit: Fixed a bug in scheduler driver to properly erase
'savedOffers'.
Posted by vi...@apache.org.
Fixed a bug in scheduler driver to properly erase 'savedOffers'.
Review: https://reviews.apache.org/r/22148
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dd94a1fe
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dd94a1fe
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dd94a1fe
Branch: refs/heads/vinod/authorize_tasks
Commit: dd94a1fe9aff281f49d61bd8c214f41fcb340b04
Parents: 23a25e7
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu May 29 15:32:03 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd94a1fe/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 9459c9c..6e14f1c 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -890,7 +890,7 @@ protected:
foreach (const TaskInfo& task, result) {
// Keep only the slave PIDs where we run tasks so we can send
// framework messages directly.
- if (savedOffers.count(offerId) > 0) {
+ if (savedOffers.contains(offerId)) {
if (savedOffers[offerId].count(task.slave_id()) > 0) {
savedSlavePids[task.slave_id()] =
savedOffers[offerId][task.slave_id()];
@@ -902,10 +902,9 @@ protected:
LOG(WARNING) << "Attempting to launch task " << task.task_id()
<< " with an unknown offer " << offerId;
}
-
- // Remove the offer since we saved all the PIDs we might use.
- savedOffers.erase(offerId);
}
+ // Remove the offer since we saved all the PIDs we might use.
+ savedOffers.erase(offerId);
}
foreach (const TaskInfo& task, result) {
[7/7] git commit: Changed TaskInfoError and OfferError to
Option.
Posted by vi...@apache.org.
Changed TaskInfoError and OfferError to Option<Error>.
Review: https://reviews.apache.org/r/22338
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b73ea627
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b73ea627
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b73ea627
Branch: refs/heads/vinod/authorize_tasks
Commit: b73ea62753bb4e7301edb4691d0b85c3be490fb5
Parents: b508516
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Jun 6 15:45:33 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 57 ++++++++++++++++++++++------------------------
1 file changed, 27 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b73ea627/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7884aa4..df75c8a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -34,6 +34,7 @@
#include <process/metrics/metrics.hpp>
#include <stout/check.hpp>
+#include <stout/error.hpp>
#include <stout/lambda.hpp>
#include <stout/memory.hpp>
#include <stout/multihashmap.hpp>
@@ -1375,11 +1376,9 @@ void Master::resourceRequest(
// back to the framework for only that task description. An instance
// will be reused for each task description from same 'launchTasks()',
// but not for task descriptions from different offers.
-typedef Option<string> TaskInfoError;
-
struct TaskInfoVisitor
{
- virtual TaskInfoError operator () (
+ virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
@@ -1391,7 +1390,7 @@ struct TaskInfoVisitor
// Checks that a task id is valid, i.e., contains only valid characters.
struct TaskIDChecker : TaskInfoVisitor
{
- virtual TaskInfoError operator () (
+ virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
@@ -1416,7 +1415,7 @@ struct TaskIDChecker : TaskInfoVisitor
// Checks that the slave ID used by a task is correct.
struct SlaveIDChecker : TaskInfoVisitor
{
- virtual TaskInfoError operator () (
+ virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
@@ -1438,7 +1437,7 @@ struct SlaveIDChecker : TaskInfoVisitor
// task tries to re-use an ID.
struct UniqueTaskIDChecker : TaskInfoVisitor
{
- virtual TaskInfoError operator () (
+ virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
@@ -1464,7 +1463,7 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
// offered on that slave
struct ResourceUsageChecker : TaskInfoVisitor
{
- virtual TaskInfoError operator () (
+ virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
@@ -1531,7 +1530,7 @@ struct ResourceUsageChecker : TaskInfoVisitor
// ExecutorID) have an identical ExecutorInfo.
struct ExecutorInfoChecker : TaskInfoVisitor
{
- virtual TaskInfoError operator () (
+ virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
@@ -1572,7 +1571,7 @@ struct ExecutorInfoChecker : TaskInfoVisitor
// launched on a slave that has not enabled checkpointing.
struct CheckpointChecker : TaskInfoVisitor
{
- virtual TaskInfoError operator () (
+ virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
@@ -1592,11 +1591,9 @@ struct CheckpointChecker : TaskInfoVisitor
// The error reporting scheme is also similar to TaskInfoVisitor.
// However, offer processing (and subsequent task processing) is
// aborted altogether if offer visitor reports an error.
-typedef Option<string> OfferError;
-
struct OfferVisitor
{
- virtual OfferError operator () (
+ virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master) = 0;
@@ -1619,14 +1616,14 @@ struct OfferVisitor
// Checks validity/liveness of an offer.
struct ValidOfferChecker : OfferVisitor {
- virtual OfferError operator () (
+ virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master)
{
Offer* offer = getOffer(master, offerId);
if (offer == NULL) {
- return "Offer " + stringify(offerId) + " is no longer valid";
+ return Error("Offer " + stringify(offerId) + " is no longer valid");
}
return None();
@@ -1636,14 +1633,14 @@ struct ValidOfferChecker : OfferVisitor {
// Checks that an offer belongs to the expected framework.
struct FrameworkChecker : OfferVisitor {
- virtual OfferError operator () (
+ virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master)
{
Offer* offer = getOffer(master, offerId);
if (offer == NULL) {
- return "Offer " + stringify(offerId) + " is no longer valid";
+ return Error("Offer " + stringify(offerId) + " is no longer valid");
}
if (!(framework.id == offer->framework_id())) {
@@ -1661,7 +1658,7 @@ struct FrameworkChecker : OfferVisitor {
// the same slave.
struct SlaveChecker : OfferVisitor
{
- virtual OfferError operator () (
+ virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master)
@@ -1703,7 +1700,7 @@ struct SlaveChecker : OfferVisitor
// Checks that an offer only appears once in offer list.
struct UniqueOfferIDChecker : OfferVisitor
{
- virtual OfferError operator () (
+ virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master)
@@ -1782,17 +1779,17 @@ void Master::launchTasks(
// Verify and aggregate all offers.
// Abort offer and task processing if any offer validation failed.
Resources totalResources;
- OfferError offerError = None();
+ Option<Error> error = None();
foreach (const OfferID& offerId, offerIds) {
foreach (OfferVisitor* visitor, offerVisitors) {
- offerError = (*visitor)(offerId, *framework, this);
- if (offerError.isSome()) {
+ error = (*visitor)(offerId, *framework, this);
+ if (error.isSome()) {
break;
}
}
// Offer validation error needs to be propagated from visitor
// loop above.
- if (offerError.isSome()) {
+ if (error.isSome()) {
break;
}
@@ -1819,7 +1816,7 @@ void Master::launchTasks(
foreach (const OfferID& offerId, offerIds) {
Offer* offer = getOffer(offerId);
if (offer != NULL) {
- if (offerError.isSome()) {
+ if (error.isSome()) {
allocator->resourcesRecovered(
offer->framework_id(), offer->slave_id(), offer->resources());
}
@@ -1827,9 +1824,9 @@ void Master::launchTasks(
}
}
- if (offerError.isSome()) {
- LOG(WARNING) << "Failed to validate offers " << stringify(offerIds)
- << ": " << offerError.get();
+ if (error.isSome()) {
+ LOG(WARNING) << "Failed to validate offer " << stringify(offerIds)
+ << ": " << error.get().message;
foreach (const TaskInfo& task, tasks) {
const StatusUpdate& update = protobuf::createStatusUpdate(
@@ -1837,7 +1834,7 @@ void Master::launchTasks(
task.slave_id(),
task.task_id(),
TASK_LOST,
- "Task launched with invalid offers: " + offerError.get());
+ "Task launched with invalid offers: " + error.get().message);
LOG(INFO) << "Sending status update " << update
<< " for launch task attempt on invalid offers: "
@@ -1873,7 +1870,7 @@ void Master::launchTasks(
// Loop through each task and check it's validity.
foreach (const TaskInfo& task, tasks) {
// Possible error found while checking task's validity.
- TaskInfoError error = None();
+ Option<Error> error = None();
// Invoke each visitor.
foreach (TaskInfoVisitor* visitor, taskVisitors) {
@@ -1889,14 +1886,14 @@ void Master::launchTasks(
} else {
// Error validating task, send a failed status update.
LOG(WARNING) << "Failed to validate task " << task.task_id()
- << " : " << error.get();
+ << " : " << error.get().message;
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id,
slave->id,
task.task_id(),
TASK_LOST,
- error.get());
+ error.get().message);
LOG(INFO) << "Sending status update "
<< update << " for invalid task";