You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/06/11 01:33:46 UTC

[1/7] git commit: Authorized resource roles to receive offers from.

Repository: mesos
Updated Branches:
  refs/heads/vinod/authorize_tasks [created] 57f352e20


Authorized resource roles to receive offers from.

Review: https://reviews.apache.org/r/22284


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/57f352e2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/57f352e2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/57f352e2

Branch: refs/heads/vinod/authorize_tasks
Commit: 57f352e20a85e077321bdc9973f33d0464917146
Parents: c408455
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Jun 4 16:08:30 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700

----------------------------------------------------------------------
 src/master/flags.hpp                     |   3 +-
 src/master/master.cpp                    | 154 ++++++++++--
 src/master/master.hpp                    |  25 +-
 src/tests/master_authorization_tests.cpp | 347 ++++++++++++++++++++++++++
 4 files changed, 495 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/57f352e2/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index 4863359..ed08c6b 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -175,9 +175,10 @@ public:
         "Human readable name for the cluster,\n"
         "displayed in the webui.");
 
+    // TODO(vinod): Deprecate this in favor of '--acls'.
     add(&Flags::roles,
         "roles",
-        "A comma seperated list of the allocation\n"
+        "A comma separated list of the allocation\n"
         "roles that frameworks in this cluster may\n"
         "belong to.");
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/57f352e2/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1a2b219..4a01b1a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -990,7 +990,18 @@ void Master::detected(const Future<Option<MasterInfo> >& _leader)
 }
 
 
-Try<Nothing> Master::validate(
+// Helper to convert authorization result to Future<Option<Error> >.
+static Future<Option<Error> > _authorize(const string& message, bool authorized)
+{
+  if (authorized) {
+    return None();
+  }
+
+  return Error(message);
+}
+
+
+Future<Option<Error> > Master::validate(
     const FrameworkInfo& frameworkInfo,
     const UPID& from)
 {
@@ -1016,11 +1027,33 @@ Try<Nothing> Master::validate(
     }
   }
 
+  // TODO(vinod): Deprecate this in favor of ACLs.
   if (!roles.contains(frameworkInfo.role())) {
-    return Error("Role '" + frameworkInfo.role() + "' is not valid.");
+    return Error("Role '" + frameworkInfo.role() + "' is invalid");
   }
 
-  return Nothing();
+  if (authorizer.isNone()) {
+    // Authorization is disabled.
+    return None();
+  }
+
+  LOG(INFO)
+    << "Authorizing framework principal '" << frameworkInfo.principal()
+    << "' to receive offers for role '" << frameworkInfo.role() << "'";
+
+  mesos::ACL::ReceiveOffers request;
+  if (frameworkInfo.has_principal()) {
+    request.mutable_principals()->add_values(frameworkInfo.principal());
+  } else {
+    // Framework doesn't have a principal set.
+    request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+  }
+  request.mutable_roles()->add_values(frameworkInfo.role());
+
+  return authorizer.get()->authorize(request).then(
+      lambda::bind(&_authorize,
+                   "Not authorized to use role '" + frameworkInfo.role() + "'",
+                   lambda::_1));
 }
 
 
@@ -1031,6 +1064,11 @@ void Master::registerFramework(
   ++metrics.messages_register_framework;
 
   if (authenticating.contains(from)) {
+    // TODO(vinod): Consider dropping this request and fix the tests
+    // to deal with the drop. Currently there is a race between master
+    // realizing the framework is authenticated and framework sending
+    // a registration request. Dropping this message will cause the
+    // framework to retry slowing down the tests.
     LOG(INFO) << "Queuing up registration request from " << from
               << " because authentication is still in progress";
 
@@ -1039,17 +1077,48 @@ void Master::registerFramework(
     return;
   }
 
-  Try<Nothing> valid = validate(frameworkInfo, from);
-  if (valid.isError()) {
-    LOG(WARNING) << "Refusing registration of framework at " << from  << ": "
-                 << valid.error();
+  LOG(INFO) << "Received registration request from " << from;
+
+  validate(frameworkInfo, from)
+    .onAny(defer(self(),
+                 &Master::_registerFramework,
+                 from,
+                 frameworkInfo,
+                 lambda::_1));
+}
+
+
+void Master::_registerFramework(
+    const UPID& from,
+    const FrameworkInfo& frameworkInfo,
+    const Future<Option<Error> >& validationError)
+{
+  CHECK_READY(validationError);
+  if (validationError.get().isSome()) {
+    LOG(INFO) << "Refusing registration of framework at " << from  << ": "
+              << validationError.get().get().message;
+
     FrameworkErrorMessage message;
-    message.set_message(valid.error());
+    message.set_message(validationError.get().get().message);
     send(from, message);
     return;
   }
 
-  LOG(INFO) << "Received registration request from " << from;
+  if (authenticating.contains(from)) {
+    // This could happen if a new authentication request came from the
+    // same framework while validation was in progress.
+    LOG(INFO) << "Dropping registration request from " << from
+              << " because new authentication attempt is in progress";
+    return;
+  }
+
+  if (flags.authenticate_frameworks && !authenticated.contains(from)) {
+    // This could happen if another (failed over) framework
+    // authenticated while validation was in progress.
+    LOG(INFO) << "Dropping registration request from " << from
+              << " because it is not authenticated";
+    return;
+  }
 
   // Check if this framework is already registered (because it retries).
   foreachvalue (Framework* framework, frameworks.activated) {
@@ -1069,6 +1138,7 @@ void Master::registerFramework(
 
   LOG(INFO) << "Registering framework " << framework->id << " at " << from;
 
+  // TODO(vinod): Deprecate this in favor of authorization.
   bool rootSubmissions = flags.root_submissions;
 
   if (framework->info.user() == "root" && rootSubmissions == false) {
@@ -1095,7 +1165,9 @@ void Master::reregisterFramework(
   if (authenticating.contains(from)) {
     LOG(INFO) << "Queuing up re-registration request from " << from
               << " because authentication is still in progress";
-
+    // TODO(vinod): Consider dropping this request and fix the tests
+    // to deal with the drop. See 'Master::registerFramework()' for
+    // more details.
     authenticating[from]
       .onReady(defer(self(),
                      &Self::reregisterFramework,
@@ -1113,16 +1185,57 @@ void Master::reregisterFramework(
     return;
   }
 
-  Try<Nothing> valid = validate(frameworkInfo, from);
-  if (valid.isError()) {
-    LOG(WARNING) << "Refusing re-registration of framework at " << from << ": "
-                 << valid.error();
+  LOG(INFO) << "Received re-registration request from framework "
+            << frameworkInfo.id() << " at " << from;
+
+  validate(frameworkInfo, from)
+    .onAny(defer(self(),
+                 &Master::_reregisterFramework,
+                 from,
+                 frameworkInfo,
+                 failover,
+                 lambda::_1));
+}
+
+
+void Master::_reregisterFramework(
+    const UPID& from,
+    const FrameworkInfo& frameworkInfo,
+    bool failover,
+    const Future<Option<Error> >& validationError)
+{
+  CHECK_READY(validationError);
+  if (validationError.get().isSome()) {
+    LOG(INFO) << "Refusing re-registration of framework " << frameworkInfo.id()
+              << " at " << from << ": " << validationError.get().get().message;
+
     FrameworkErrorMessage message;
-    message.set_message(valid.error());
+    message.set_message(validationError.get().get().message);
     send(from, message);
     return;
   }
 
+  if (authenticating.contains(from)) {
+    // This could happen if a new authentication request came from the
+    // same framework while validation was in progress.
+    LOG(INFO) << "Dropping re-registration request of framework "
+              << frameworkInfo.id() << " at " << from
+              << " because new authentication attempt is in progress";
+    return;
+  }
+
+  if (flags.authenticate_frameworks && !authenticated.contains(from)) {
+    // This could happen if another (failed over) framework
+    // authenticated while validation was in progress. It is important
+    // to drop this because if this request is from a failing over
+    // framework (pid = from) we don't want to failover the already
+    // registered framework (pid = framework->pid).
+    LOG(INFO) << "Dropping re-registration request of framework "
+              << frameworkInfo.id() << " at " << from
+              << " because it is not authenticated";
+    return;
+  }
+
   LOG(INFO) << "Re-registering framework " << frameworkInfo.id()
             << " at " << from;
 
@@ -1879,17 +1992,6 @@ void Master::launchTasks(
 }
 
 
-// Helper to convert authorization result to Future<Option<Error> >.
-static Future<Option<Error> > _authorize(const string& message, bool authorized)
-{
-  if (authorized) {
-    return None();
-  }
-
-  return Error(message);
-}
-
-
 Future<Option<Error> > Master::validateTask(
     const TaskInfo& task,
     Framework* framework,

http://git-wip-us.apache.org/repos/asf/mesos/blob/57f352e2/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0c68a5b..7a12185 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -275,6 +275,19 @@ protected:
       const std::vector<ExecutorInfo>& executors,
       const std::vector<Task>& tasks);
 
+  // 'registerFramework()' continuation.
+  void _registerFramework(
+      const process::UPID& from,
+      const FrameworkInfo& frameworkInfo,
+      const process::Future<Option<Error> >& validationError);
+
+  // 'reregisterFramework()' continuation.
+  void _reregisterFramework(
+      const process::UPID& from,
+      const FrameworkInfo& frameworkInfo,
+      bool failover,
+      const process::Future<Option<Error> >& validationError);
+
   // Add a framework.
   void addFramework(Framework* framework);
 
@@ -635,13 +648,11 @@ private:
 
   Option<process::Time> electedTime; // Time when this master is elected.
 
-  // Helper method for common FrameworkInfo and PID validation shared
-  // by Framework registration and re-registration.
-  // An error return value indicates the request is invalid and a
-  // FrameworkErrorMessage should be returned.
-  // Note that registration/re-registration specific checking is not
-  // handled here.
-  Try<Nothing> validate(
+  // Validates the framework including authorization.
+  // Returns None if the framework is valid.
+  // Returns Error if the framework is invalid.
+  // Returns Failure if authorization returns 'Failure'.
+  process::Future<Option<Error> > validate(
       const FrameworkInfo& frameworkInfo,
       const process::UPID& from);
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/57f352e2/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index 3a28ca2..661d67e 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -599,3 +599,350 @@ TEST_F(MasterAuthorizationTest, ReconcileTask)
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
+
+
+// This test verifies that a framework registration with authorized
+// role is successful.
+TEST_F(MasterAuthorizationTest, AuthorizedRole)
+{
+  // Setup ACLs so that the framework can receive offers for role
+  // "foo".
+  ACLs acls;
+  mesos::ACL::ReceiveOffers* acl = acls.add_receive_offers();
+  acl->mutable_principals()->add_values(DEFAULT_FRAMEWORK_INFO.principal());
+  acl->mutable_roles()->add_values("foo");
+
+  master::Flags flags = CreateMasterFlags();
+  flags.roles = "foo";
+  flags.acls = JSON::Protobuf(acls);
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("foo");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  driver.start();
+
+  AWAIT_READY(registered);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that a framework registration with unauthorized
+// role is denied.
+TEST_F(MasterAuthorizationTest, UnauthorizedRole)
+{
+  // Setup ACLs so that no framework can receive offers for role
+  // "foo".
+  ACLs acls;
+  mesos::ACL::ReceiveOffers* acl = acls.add_receive_offers();
+  acl->mutable_principals()->set_type(mesos::ACL::Entity::NONE);
+  acl->mutable_roles()->add_values("foo");
+
+  master::Flags flags = CreateMasterFlags();
+  flags.roles = "foo";
+  flags.acls = JSON::Protobuf(acls);
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("foo");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<Nothing> error;
+  EXPECT_CALL(sched, error(&driver, _))
+    .WillOnce(FutureSatisfy(&error));
+
+  driver.start();
+
+  // Framework should get error message from the master.
+  AWAIT_READY(error);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that an authentication request that comes from
+// the same instance of the framework (e.g., ZK blip) before
+// 'Master::_registerFramework()' from an earlier attempt, causes the
+// master to successfully register the framework.
+TEST_F(MasterAuthorizationTest, DuplicateRegistration)
+{
+  MockAuthorizer authorizer;
+  Try<PID<Master> > master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  // Create a detector for the scheduler driver because we want the
+  // spurious leading master change to be known by the scheduler
+  // driver only.
+  StandaloneMasterDetector detector(master.get());
+  MockScheduler sched;
+  TestingMesosSchedulerDriver driver(&sched, &detector);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  // Return pending futures from authorizer.
+  Future<Nothing> future1;
+  Promise<bool> promise1;
+  Future<Nothing> future2;
+  Promise<bool> promise2;
+  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+    .WillOnce(DoAll(FutureSatisfy(&future1),
+                    Return(promise1.future())))
+    .WillOnce(DoAll(FutureSatisfy(&future2),
+                    Return(promise2.future())));
+
+  driver.start();
+
+  // Wait until first authorization attempt is in progress.
+  AWAIT_READY(future1);
+
+  // Simulate a spurious leading master change at the scheduler.
+  detector.appoint(master.get());
+
+  // Wait until second authorization attempt is in progress.
+  AWAIT_READY(future2);
+
+  // Now complete the first authorization attempt.
+  promise1.set(true);
+
+  // First registration request should succeed because the
+  // framework PID did not change.
+  AWAIT_READY(registered);
+
+  Future<FrameworkRegisteredMessage> frameworkRegisteredMessage =
+    FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _);
+
+  // Now complete the second authorization attempt.
+  promise2.set(true);
+
+  // Master should acknowledge the second registration attempt too.
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that an authentication request that comes from
+// the same instance of the framework (e.g., ZK blip) before
+// 'Master::_reregisterFramework()' from an earlier attempt, causes
+// the master to successfully re-register the framework.
+TEST_F(MasterAuthorizationTest, DuplicateReregistration)
+{
+  MockAuthorizer authorizer;
+  Try<PID<Master> > master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  // Create a detector for the scheduler driver because we want the
+  // spurious leading master change to be known by the scheduler
+  // driver only.
+  StandaloneMasterDetector detector(master.get());
+  MockScheduler sched;
+  TestingMesosSchedulerDriver driver(&sched, &detector);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  // Return pending futures from authorizer after the first attempt.
+  Future<Nothing> future2;
+  Promise<bool> promise2;
+  Future<Nothing> future3;
+  Promise<bool> promise3;
+  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+    .WillOnce(Return(true))
+    .WillOnce(DoAll(FutureSatisfy(&future2),
+                    Return(promise2.future())))
+    .WillOnce(DoAll(FutureSatisfy(&future3),
+                    Return(promise3.future())));
+
+  driver.start();
+
+  // Wait for the framework to be registered.
+  AWAIT_READY(registered);
+
+  EXPECT_CALL(sched, disconnected(&driver));
+
+  // Simulate a spurious leading master change at the scheduler.
+  detector.appoint(master.get());
+
+  // Wait until the second authorization attempt is in progress.
+  AWAIT_READY(future2);
+
+  // Simulate another spurious leading master change at the scheduler.
+  detector.appoint(master.get());
+
+  // Wait until the third authorization attempt is in progress.
+  AWAIT_READY(future3);
+
+  Future<Nothing> reregistered;
+  EXPECT_CALL(sched, reregistered(&driver, _))
+    .WillOnce(FutureSatisfy(&reregistered));
+
+  // Now complete the second authorization attempt.
+  promise2.set(true);
+
+  // First re-registration request should succeed because the
+  // framework PID did not change.
+  AWAIT_READY(reregistered);
+
+  Future<FrameworkReregisteredMessage> frameworkReregisteredMessage =
+    FUTURE_PROTOBUF(FrameworkReregisteredMessage(), _, _);
+
+  // Now complete the third authorization attempt.
+  promise3.set(true);
+
+  // Master should acknowledge the second re-registration attempt too.
+  AWAIT_READY(frameworkReregisteredMessage);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test ensures that a framework that is removed while
+// authorization for registration is in progress is properly handled.
+TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration)
+{
+  MockAuthorizer authorizer;
+  Try<PID<Master> > master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  // Return a pending future from authorizer.
+  Future<Nothing> future;
+  Promise<bool> promise;
+  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+    .WillOnce(DoAll(FutureSatisfy(&future),
+                    Return(promise.future())));
+
+  driver.start();
+
+  // Wait until authorization is in progress.
+  AWAIT_READY(future);
+
+  // Stop the framework.
+  // At this point the framework is disconnected but the master does
+  // not take any action because the framework is not in its map yet.
+  driver.stop();
+  driver.join();
+
+  // Settle the clock here to ensure master handles the framework
+  // 'exited' event.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  Future<Nothing> frameworkRemoved =
+    FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+  // Now complete authorization.
+  promise.set(true);
+
+  // When the master tries to link to a non-existent framework PID
+  // it should realize the framework is gone and remove it.
+  AWAIT_READY(frameworkRemoved);
+
+  Shutdown();
+}
+
+
+// This test ensures that a framework that is removed while
+// authorization for re-registration is in progress is properly
+// handled.
+TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeReregistration)
+{
+  MockAuthorizer authorizer;
+  Try<PID<Master> > master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  // Create a detector for the scheduler driver because we want the
+  // spurious leading master change to be known by the scheduler
+  // driver only.
+  StandaloneMasterDetector detector(master.get());
+  MockScheduler sched;
+  TestingMesosSchedulerDriver driver(&sched, &detector);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  // Return a pending future from authorizer after first attempt.
+  Future<Nothing> future2;
+  Promise<bool> promise2;
+  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+    .WillOnce(Return(true))
+    .WillOnce(DoAll(FutureSatisfy(&future2),
+                    Return(promise2.future())));
+
+  driver.start();
+
+  // Wait until the framework is registered.
+  AWAIT_READY(registered);
+
+  EXPECT_CALL(sched, disconnected(&driver));
+
+  // Framework should not be re-registered.
+  EXPECT_CALL(sched, reregistered(&driver, _))
+    .Times(0);
+
+  // Simulate a spurious leading master change at the scheduler.
+  detector.appoint(master.get());
+
+  // Wait until the second authorization attempt is in progress.
+  AWAIT_READY(future2);
+
+  Future<Nothing> frameworkRemoved =
+    FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+  // Stop the framework.
+  driver.stop();
+  driver.join();
+
+  // Wait until the framework is removed.
+  AWAIT_READY(frameworkRemoved);
+
+  // Now complete the second authorization attempt.
+  promise2.set(true);
+
+  // Master should drop the second framework re-registration request
+  // because the framework PID was removed from 'authenticated' map.
+  // Settle the clock here to ensure 'Master::_reregisterFramework()'
+  // is executed.
+  Clock::pause();
+  Clock::settle();
+
+  Shutdown();
+}


[5/7] git commit: Injected Authorizer into Master.

Posted by vi...@apache.org.
Injected Authorizer into Master.

Review: https://reviews.apache.org/r/22150


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b5085168
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b5085168
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b5085168

Branch: refs/heads/vinod/authorize_tasks
Commit: b5085168f2ada382df2eaf600a5349e21e2b38d3
Parents: 6267a0f
Author: Vinod Kone <vi...@twitter.com>
Authored: Sat May 31 19:04:02 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700

----------------------------------------------------------------------
 src/local/local.cpp   | 29 +++++++++++++++++++++++++++--
 src/master/main.cpp   | 21 +++++++++++++++++++++
 src/master/master.cpp | 14 ++++++--------
 src/master/master.hpp |  5 +++--
 src/tests/cluster.hpp | 22 +++++++++++++++++++++-
 5 files changed, 78 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 5d26aff..e05a225 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -21,13 +21,17 @@
 #include <sstream>
 #include <vector>
 
+#include <process/owned.hpp>
 #include <process/pid.hpp>
 
 #include <stout/exit.hpp>
 #include <stout/foreach.hpp>
 #include <stout/path.hpp>
+#include <stout/try.hpp>
 #include <stout/strings.hpp>
 
+#include "authorizer/authorizer.hpp"
+
 #include "common/protobuf_utils.hpp"
 
 #include "local.hpp"
@@ -67,6 +71,7 @@ using mesos::internal::master::Repairer;
 using mesos::internal::slave::Containerizer;
 using mesos::internal::slave::Slave;
 
+using process::Owned;
 using process::PID;
 using process::UPID;
 
@@ -92,6 +97,7 @@ static Master* master = NULL;
 static map<Containerizer*, Slave*> slaves;
 static StandaloneMasterDetector* detector = NULL;
 static MasterContender* contender = NULL;
+static Option<Authorizer*> authorizer = None();
 static Files* files = NULL;
 
 
@@ -153,15 +159,29 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
 
     contender = new StandaloneMasterContender();
     detector = new StandaloneMasterDetector();
-    master =
-      new Master(
+
+    if (flags.acls.isSome()) {
+      Try<Owned<Authorizer> > authorizer_ =
+        Authorizer::create(flags.acls.get());
+
+      if (authorizer_.isError()) {
+        EXIT(1) << "Failed to initialize the authorizer: "
+                << authorizer_.error() << " (see --acls flag)";
+      }
+      Owned<Authorizer> authorizer__ = authorizer_.get();
+      authorizer = authorizer__.release();
+    }
+
+    master = new Master(
         _allocator,
         registrar,
         repairer,
         files,
         contender,
         detector,
+        authorizer,
         flags);
+
     detector->appoint(master->info());
   }
 
@@ -222,6 +242,11 @@ void shutdown()
 
     slaves.clear();
 
+    if (authorizer.isSome()) {
+      delete authorizer.get();
+      authorizer = None();
+    }
+
     delete detector;
     detector = NULL;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 8ceaae6..68cd56b 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -22,18 +22,22 @@
 
 #include <mesos/mesos.hpp>
 
+#include <process/owned.hpp>
 #include <process/pid.hpp>
 
 #include <stout/check.hpp>
 #include <stout/exit.hpp>
 #include <stout/flags.hpp>
 #include <stout/nothing.hpp>
+#include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/stringify.hpp>
 #include <stout/strings.hpp>
 #include <stout/try.hpp>
 
+#include "authorizer/authorizer.hpp"
+
 #include "common/build.hpp"
 #include "common/protobuf_utils.hpp"
 
@@ -64,6 +68,7 @@ using namespace zookeeper;
 
 using mesos::MasterInfo;
 
+using process::Owned;
 using process::UPID;
 
 using std::cerr;
@@ -243,6 +248,17 @@ int main(int argc, char** argv)
   }
   detector = detector_.get();
 
+  Option<Authorizer*> authorizer = None();
+  if (flags.acls.isSome()) {
+    Try<Owned<Authorizer> > authorizer_ = Authorizer::create(flags.acls.get());
+    if (authorizer_.isError()) {
+      EXIT(1) << "Failed to initialize the authorizer: "
+              << authorizer_.error() << " (see --acls flag)";
+    }
+    Owned<Authorizer> authorizer__ = authorizer_.get();
+    authorizer = authorizer__.release();
+  }
+
   LOG(INFO) << "Starting Mesos master";
 
   Master* master =
@@ -253,6 +269,7 @@ int main(int argc, char** argv)
       &files,
       contender,
       detector,
+      authorizer,
       flags);
 
   if (zk.isNone()) {
@@ -277,5 +294,9 @@ int main(int argc, char** argv)
   delete contender;
   delete detector;
 
+  if (authorizer.isSome()) {
+    delete authorizer.get();
+  }
+
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c18ccc4..7884aa4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -28,6 +28,7 @@
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/id.hpp>
+#include <process/owned.hpp>
 #include <process/run.hpp>
 
 #include <process/metrics/metrics.hpp>
@@ -38,6 +39,7 @@
 #include <stout/multihashmap.hpp>
 #include <stout/nothing.hpp>
 #include <stout/numify.hpp>
+#include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/stringify.hpp>
@@ -216,6 +218,7 @@ Master::Master(
     Files* _files,
     MasterContender* _contender,
     MasterDetector* _detector,
+    const Option<Authorizer*>& _authorizer,
     const Flags& _flags)
   : ProcessBase("master"),
     http(*this),
@@ -226,6 +229,7 @@ Master::Master(
     files(_files),
     contender(_contender),
     detector(_detector),
+    authorizer(_authorizer),
     metrics(*this),
     electedTime(None())
 {
@@ -336,14 +340,8 @@ void Master::initialize()
             << " (see --credentials flag)";
   }
 
-  if (flags.acls.isSome()) {
-    LOG(INFO) << "Master enabling authorization";
-    Try<Owned<Authorizer> > authorizer_ = Authorizer::create(flags.acls.get());
-    if (authorizer_.isError()) {
-      EXIT(1) << "Failed to initialize the Authorizer: "
-              << authorizer_.error() << " (see --acls flag)";
-    }
-    authorizer = authorizer_.get();
+  if (authorizer.isSome()) {
+    LOG(INFO) << "Authorization enabled";
   }
 
   hashmap<string, RoleInfo> roleInfos;

http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 26af113..75f0d49 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -100,6 +100,7 @@ public:
          Files* files,
          MasterContender* contender,
          MasterDetector* detector,
+         const Option<Authorizer*>& authorizer,
          const Flags& flags = Flags());
 
   virtual ~Master();
@@ -401,6 +402,8 @@ private:
   MasterContender* contender;
   MasterDetector* detector;
 
+  const Option<Authorizer*> authorizer;
+
   MasterInfo info_;
 
   // Indicates when recovery is complete. Recovery begins once the
@@ -468,8 +471,6 @@ private:
   // Principals of authenticated frameworks/slaves keyed by PID.
   hashmap<process::UPID, std::string> authenticated;
 
-  Option<process::Owned<Authorizer> > authorizer;
-
   int64_t nextFrameworkId; // Used to give each framework a unique ID.
   int64_t nextOfferId;     // Used to give each slot offer a unique ID.
   int64_t nextSlaveId;     // Used to give each slave a unique ID.

http://git-wip-us.apache.org/repos/asf/mesos/blob/b5085168/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index f4cc9a6..449165c 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -45,6 +45,8 @@
 #include "linux/cgroups.hpp"
 #endif // __linux__
 
+#include "authorizer/authorizer.hpp"
+
 #include "log/log.hpp"
 
 #include "log/tool/initialize.hpp"
@@ -131,7 +133,8 @@ public:
           registrar(NULL),
           repairer(NULL),
           contender(NULL),
-          detector(NULL) {}
+          detector(NULL),
+          authorizer(None()) {}
 
       master::Master* master;
       master::allocator::Allocator* allocator;
@@ -143,6 +146,7 @@ public:
       master::Repairer* repairer;
       MasterContender* contender;
       MasterDetector* detector;
+      Option<Authorizer*> authorizer;
     };
 
     std::map<process::PID<master::Master>, Master> masters;
@@ -350,6 +354,17 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
     master.detector = new StandaloneMasterDetector();
   }
 
+  if (flags.acls.isSome()) {
+    Try<process::Owned<Authorizer> > authorizer_ =
+      Authorizer::create(flags.acls.get());
+    if (authorizer_.isError()) {
+      return Error("Failed to initialize the authorizer: " +
+                   authorizer_.error() + " (see --acls flag)");
+    }
+    process::Owned<Authorizer> authorizer__ = authorizer_.get();
+    master.authorizer = authorizer__.release();
+  }
+
   master.master = new master::Master(
       master.allocator,
       master.registrar,
@@ -357,6 +372,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
       &cluster->files,
       master.contender,
       master.detector,
+      master.authorizer,
       flags);
 
   if (url.isNone()) {
@@ -408,6 +424,10 @@ inline Try<Nothing> Cluster::Masters::stop(
   delete master.contender;
   delete master.detector;
 
+  if (master.authorizer.isSome()) {
+    delete master.authorizer.get();
+  }
+
   masters.erase(pid);
 
   return Nothing();


[2/7] git commit: Authorized launch tasks.

Posted by vi...@apache.org.
Authorized launch tasks.

Review: https://reviews.apache.org/r/22151


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a6c4ee70
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a6c4ee70
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a6c4ee70

Branch: refs/heads/vinod/authorize_tasks
Commit: a6c4ee70262380c3ecb3c43c8eee0108f6b224eb
Parents: b73ea62
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed May 28 16:45:22 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                               |   1 +
 src/common/protobuf_utils.hpp                 |   2 +
 src/master/hierarchical_allocator_process.hpp |   1 -
 src/master/master.cpp                         | 474 +++++++++++++++------
 src/master/master.hpp                         |  36 +-
 src/tests/master_authorization_tests.cpp      | 209 +++++++++
 6 files changed, 575 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 1d49dca..c91b438 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -975,6 +975,7 @@ mesos_tests_SOURCES =				\
   tests/log_tests.cpp				\
   tests/logging_tests.cpp			\
   tests/main.cpp				\
+  tests/master_authorization_tests.cpp		\
   tests/master_contender_detector_tests.cpp	\
   tests/master_tests.cpp			\
   tests/mesos.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 0f65341..12ff00a 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -48,6 +48,8 @@ inline bool isTerminalState(const TaskState& state)
 }
 
 
+// TODO(vinod): Make SlaveID optional because 'StatusUpdate.SlaveID'
+// is optional.
 inline StatusUpdate createStatusUpdate(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 0c5e2e0..1765e70 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -553,7 +553,6 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesUnused(
   // result of a valid task launch by an active
   // framework that doesn't use the entire offer.
   CHECK(frameworks.contains(frameworkId));
-
   const std::string& role = frameworks[frameworkId].role();
   sorters[role]->unallocated(frameworkId.value(), resources);
   sorters[role]->remove(resources);

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index df75c8a..1a2b219 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -25,6 +25,8 @@
 #include <list>
 #include <sstream>
 
+#include <process/check.hpp>
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/id.hpp>
@@ -68,6 +70,7 @@ using std::list;
 using std::string;
 using std::vector;
 
+using process::await;
 using process::wait; // Necessary on some OS's to disambiguate.
 using process::Clock;
 using process::Failure;
@@ -576,6 +579,9 @@ void Master::finalize()
   // allocator or the roles because it is unnecessary bookkeeping at
   // this point since we are shutting down.
   foreachvalue (Framework* framework, frameworks.activated) {
+    // Remove pending tasks from the framework.
+    framework->pendingTasks.clear();
+
     // Remove pointers to the framework's tasks in slaves.
     foreachvalue (Task* task, utils::copy(framework->tasks)) {
       Slave* slave = getSlave(task->slave_id());
@@ -1387,6 +1393,7 @@ struct TaskInfoVisitor
   virtual ~TaskInfoVisitor() {}
 };
 
+
 // Checks that a task id is valid, i.e., contains only valid characters.
 struct TaskIDChecker : TaskInfoVisitor
 {
@@ -1445,22 +1452,21 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
   {
     const TaskID& taskId = task.task_id();
 
-    if (ids.contains(taskId) || framework.tasks.contains(taskId)) {
+    if (framework.pendingTasks.contains(taskId) ||
+        framework.tasks.contains(taskId)) {
       return "Task has duplicate ID: " + taskId.value();
     }
-
-    ids.insert(taskId);
-
     return None();
   }
-
-  hashset<TaskID> ids;
 };
 
 
-// Checks that the used resources by a task (and executor if
-// necessary) on each slave does not exceed the total resources
-// offered on that slave
+// Checks that the used resources by a task on each slave does not
+// exceed the total resources offered on that slave.
+// NOTE: We do not account for executor resources here because tasks
+// are launched asynchronously and an executor might exit between
+// validation and actual launch. Therefore executor resources are
+// accounted for in 'Master::_launchTasks()'.
 struct ResourceUsageChecker : TaskInfoVisitor
 {
   virtual Option<Error> operator () (
@@ -1482,11 +1488,10 @@ struct ResourceUsageChecker : TaskInfoVisitor
     // Check if this task uses more resources than offered.
     Resources taskResources = task.resources();
 
-    if (!((usedResources + taskResources) <= resources)) {
+    if (!(taskResources <= resources)) {
       return "Task " + stringify(task.task_id()) + " attempted to use " +
-          stringify(taskResources) + " combined with already used " +
-          stringify(usedResources) + " is greater than offered " +
-          stringify(resources);
+             stringify(taskResources) + " which is greater than offered " +
+             stringify(resources);
     }
 
     // Check this task's executor's resources.
@@ -1496,33 +1501,13 @@ struct ResourceUsageChecker : TaskInfoVisitor
         if (!Resources::isAllocatable(resource)) {
           // TODO(benh): Send back the invalid resources?
           return "Executor for task " + stringify(task.task_id()) +
-              " uses invalid resources " + stringify(resource);
-        }
-      }
-
-      // Check if this task's executor is running, and if not check if
-      // the task + the executor use more resources than offered.
-      if (!executors.contains(task.executor().executor_id())) {
-        if (!slave.hasExecutor(framework.id, task.executor().executor_id())) {
-          taskResources += task.executor().resources();
-          if (!((usedResources + taskResources) <= resources)) {
-            return "Task " + stringify(task.task_id()) +
-                   " + executor attempted to use " + stringify(taskResources) +
-                   " combined with already used " + stringify(usedResources) +
-                   " is greater than offered " + stringify(resources);
-          }
+                 " uses invalid resources " + stringify(resource);
         }
-        executors.insert(task.executor().executor_id());
       }
     }
 
-    usedResources += taskResources;
-
     return None();
   }
-
-  Resources usedResources;
-  hashset<ExecutorID> executors;
 };
 
 
@@ -1544,11 +1529,26 @@ struct ExecutorInfoChecker : TaskInfoVisitor
 
     if (task.has_executor()) {
       const ExecutorID& executorId = task.executor().executor_id();
+      Option<ExecutorInfo> executorInfo = None();
+
       if (slave.hasExecutor(framework.id, executorId)) {
-        const Option<ExecutorInfo> executorInfo =
-          slave.executors.get(framework.id).get().get(executorId);
+        executorInfo = slave.executors.get(framework.id).get().get(executorId);
+      } else {
+        // See if any of the pending tasks have the same executor.
+        // Note that picking the first matching executor is ok because
+        // all the matching executors have been added to
+        // 'framework.pendingTasks' after validation and hence have
+        // the same executor info.
+        foreachvalue (const TaskInfo& task_, framework.pendingTasks) {
+          if (task_.has_executor() &&
+              task_.executor().executor_id() == executorId) {
+            executorInfo = task_.executor();
+            break;
+          }
+        }
+      }
 
-        if (!(task.executor() == executorInfo.get())) {
+      if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
           return "Task has invalid ExecutorInfo (existing ExecutorInfo"
               " with same ExecutorID is not compatible).\n"
               "------------------------------------------------------------\n"
@@ -1558,7 +1558,6 @@ struct ExecutorInfoChecker : TaskInfoVisitor
               "Task's ExecutorInfo:\n" +
               stringify(task.executor()) + "\n"
               "------------------------------------------------------------\n";
-        }
       }
     }
 
@@ -1854,9 +1853,55 @@ void Master::launchTasks(
             << " on slave " << *slave
             << " for framework " << framework->id;
 
-  Resources usedResources; // Accumulated resources used.
+  // Validate each task and launch if valid.
+  list<Future<Option<Error> > > futures;
+  foreach (const TaskInfo& task, tasks) {
+    futures.push_back(validateTask(task, framework, slave, totalResources));
+
+    // Add to pending tasks.
+    // NOTE: We need to do this here after validation because of the
+    // way task validators work.
+    framework->pendingTasks[task.task_id()] = task;
+  }
+
+  // Wait for all the tasks to be validated.
+  // NOTE: We wait for all tasks because currently the allocator
+  // is expected to get 'resourcesUnused()' once per 'launchTasks()'.
+  await(futures)
+    .onAny(defer(self(),
+                 &Master::_launchTasks,
+                 framework->id,
+                 slaveId.get(),
+                 tasks,
+                 totalResources,
+                 filters,
+                 lambda::_1));
+}
+
+
+// Helper to convert authorization result to Future<Option<Error> >.
+static Future<Option<Error> > _authorize(const string& message, bool authorized)
+{
+  if (authorized) {
+    return None();
+  }
+
+  return Error(message);
+}
+
+
+Future<Option<Error> > Master::validateTask(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave,
+    const Resources& totalResources)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
 
   // Create task visitors.
+  // TODO(vinod): Create the visitors on the heap and make the visit
+  // operation const.
   list<TaskInfoVisitor*> taskVisitors;
   taskVisitors.push_back(new TaskIDChecker());
   taskVisitors.push_back(new SlaveIDChecker());
@@ -1867,40 +1912,241 @@ void Master::launchTasks(
 
   // TODO(benh): Add a HealthCheckChecker visitor.
 
-  // Loop through each task and check it's validity.
-  foreach (const TaskInfo& task, tasks) {
-    // Possible error found while checking task's validity.
-    Option<Error> error = None();
+  // Invoke each visitor.
+  Option<Error> error = None();
+  foreach (TaskInfoVisitor* visitor, taskVisitors) {
+    error = (*visitor)(task, totalResources, *framework, *slave);
+    if (error.isSome()) {
+      break;
+    }
+  }
 
-    // Invoke each visitor.
-    foreach (TaskInfoVisitor* visitor, taskVisitors) {
-      error = (*visitor)(task, totalResources, *framework, *slave);
-      if (error.isSome()) {
-        break;
-      }
+  // Cleanup visitors.
+  while (!taskVisitors.empty()) {
+    TaskInfoVisitor* visitor = taskVisitors.front();
+    taskVisitors.pop_front();
+    delete visitor;
+  };
+
+  if (error.isSome()) {
+    return Error(error.get().message);
+  }
+
+  if (authorizer.isNone()) {
+    // Authorization is disabled.
+    return None();
+  }
+
+  // Authorize the task.
+  string user = framework->info.user(); // Default user.
+  if (task.has_command() && task.command().has_user()) {
+    user = task.command().user();
+  } else if (task.has_executor() && task.executor().command().has_user()) {
+    user = task.executor().command().user();
+  }
+
+  LOG(INFO)
+    << "Authorizing framework principal '" << framework->info.principal()
+    << "' to launch task " << task.task_id() << " as user '" << user << "'";
+
+  mesos::ACL::RunTasks request;
+  if (framework->info.has_principal()) {
+    request.mutable_principals()->add_values(framework->info.principal());
+  } else {
+    // Framework doesn't have a principal set.
+    request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+  }
+  request.mutable_users()->add_values(user);
+
+  return authorizer.get()->authorize(request).then(
+      lambda::bind(&_authorize,
+                   "Not authorized to launch as user '" + user + "'",
+                   lambda::_1));
+}
+
+
+void Master::launchTask(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+  CHECK(!slave->disconnected);
+
+  // Determine if this task launches an executor, and if so make sure
+  // the slave and framework state has been updated accordingly.
+  Option<ExecutorID> executorId;
+
+  if (task.has_executor()) {
+    // TODO(benh): Refactor this code into Slave::addTask.
+    if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
+      CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
+        << "Executor " << task.executor().executor_id()
+        << " known to the framework " << framework->id
+        << " but unknown to the slave " << *slave;
+
+      slave->addExecutor(framework->id, task.executor());
+      framework->addExecutor(slave->id, task.executor());
     }
 
-    if (error.isNone()) {
-      // Task looks good, get it running!
-      usedResources += launchTask(task, framework, slave);
-    } else {
-      // Error validating task, send a failed status update.
-      LOG(WARNING) << "Failed to validate task " << task.task_id()
-                   << " : " << error.get().message;
+    executorId = task.executor().executor_id();
+  }
+
+  // Add the task to the framework and slave.
+  Task* t = new Task();
+  t->mutable_framework_id()->MergeFrom(framework->id);
+  t->set_state(TASK_STAGING);
+  t->set_name(task.name());
+  t->mutable_task_id()->MergeFrom(task.task_id());
+  t->mutable_slave_id()->MergeFrom(task.slave_id());
+  t->mutable_resources()->MergeFrom(task.resources());
+
+  if (executorId.isSome()) {
+    t->mutable_executor_id()->MergeFrom(executorId.get());
+  }
+
+  framework->addTask(t);
+
+  slave->addTask(t);
+
+  // Tell the slave to launch the task!
+  LOG(INFO) << "Launching task " << task.task_id()
+            << " of framework " << framework->id
+            << " with resources " << task.resources()
+            << " on slave " << *slave;
+
+  RunTaskMessage message;
+  message.mutable_framework()->MergeFrom(framework->info);
+  message.mutable_framework_id()->MergeFrom(framework->id);
+  message.set_pid(framework->pid);
+  message.mutable_task()->MergeFrom(task);
+  send(slave->pid, message);
+
+  stats.tasks[TASK_STAGING]++;
 
+  return;
+}
+
+
+void Master::_launchTasks(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const vector<TaskInfo>& tasks,
+    const Resources& totalResources,
+    const Filters& filters,
+    const Future<list<Future<Option<Error> > > >& validationErrors)
+{
+  CHECK_READY(validationErrors);
+  CHECK_EQ(validationErrors.get().size(), tasks.size());
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework == NULL) {
+    LOG(WARNING)
+      << "Ignoring launch tasks message for framework " << frameworkId
+      << " because the framework cannot be found";
+
+    // Tell the allocator about the recovered resources.
+    allocator->resourcesRecovered(frameworkId, slaveId, totalResources);
+
+    return;
+  }
+
+  Slave* slave = getSlave(slaveId);
+  if (slave == NULL || slave->disconnected) {
+    foreach (const TaskInfo& task, tasks) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
           framework->id,
-          slave->id,
+          task.slave_id(),
+          task.task_id(),
+          TASK_LOST,
+          (slave == NULL ? "Slave removed" : "Slave disconnected"));
+
+      LOG(INFO) << "Sending status update " << update << ": "
+                << (slave == NULL ? "Slave removed" : "Slave disconnected");
+
+      StatusUpdateMessage message;
+      message.mutable_update()->CopyFrom(update);
+      send(framework->pid, message);
+    }
+
+    // Tell the allocator about the recovered resources.
+    allocator->resourcesRecovered(frameworkId, slaveId, totalResources);
+
+    return;
+  }
+
+  Resources usedResources; // Accumulated resources used.
+
+  size_t index = 0;
+  foreach (const Future<Option<Error> >& future, validationErrors.get()) {
+    const TaskInfo& task = tasks[index++];
+
+    // NOTE: The task will not be in 'pendingTasks' if 'killTask()'
+    // for the task was called before we are here.
+    if (!framework->pendingTasks.contains(task.task_id())) {
+      continue;
+    }
+
+    framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks.
+
+    CHECK(!future.isDiscarded());
+    if (future.isFailed() || future.get().isSome()) {
+      const string error = future.isFailed()
+          ? "Authorization failure: " + future.failure()
+          : future.get().get().message;
+
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          framework->id,
+          task.slave_id(),
+          task.task_id(),
+          TASK_LOST,
+          error);
+
+      LOG(INFO)
+        << "Sending status update " << update << ": " << error;
+
+      StatusUpdateMessage message;
+      message.mutable_update()->CopyFrom(update);
+      send(framework->pid, message);
+
+      continue;
+    }
+
+    // Check if resources needed by the task (and its executor in case
+    // the executor is new) are available. These resources will be
+    // added by 'launchTask()' below.
+    Resources resources = task.resources();
+    if (task.has_executor() &&
+        !slave->hasExecutor(framework->id, task.executor().executor_id())) {
+      resources += task.executor().resources();
+    }
+
+    if (!(usedResources + resources <= totalResources)) {
+      const string error =
+        "Task uses more resources " + stringify(resources) +
+        " than available " + stringify(totalResources - usedResources);
+
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          framework->id,
+          task.slave_id(),
           task.task_id(),
           TASK_LOST,
-          error.get().message);
+          error);
+
+      LOG(INFO) << "Sending status update " << update << " for invalid task: "
+                << error;
 
-      LOG(INFO) << "Sending status update "
-                << update << " for invalid task";
       StatusUpdateMessage message;
       message.mutable_update()->CopyFrom(update);
       send(framework->pid, message);
+
+      continue;
     }
+
+    // Launch task.
+    launchTask(task, framework, slave);
+    usedResources += resources;
   }
 
   // All used resources should be allocatable, enforced by our validators.
@@ -1911,19 +2157,8 @@ void Master::launchTasks(
 
   if (unusedResources.allocatable().size() > 0) {
     // Tell the allocator about the unused (e.g., refused) resources.
-    allocator->resourcesUnused(
-        framework->id,
-        slave->id,
-        unusedResources,
-        filters);
+    allocator->resourcesUnused(frameworkId, slaveId, unusedResources, filters);
   }
-
-  // Cleanup visitors.
-  while (!taskVisitors.empty()) {
-    TaskInfoVisitor* visitor = taskVisitors.front();
-    taskVisitors.pop_front();
-    delete visitor;
-  };
 }
 
 
@@ -1981,6 +2216,24 @@ void Master::killTask(
     return;
   }
 
+  if (framework->pendingTasks.contains(taskId)) {
+    // Remove from pending tasks.
+    framework->pendingTasks.erase(taskId);
+
+    StatusUpdateMessage message;
+    StatusUpdate* update = message.mutable_update();
+    update->mutable_framework_id()->MergeFrom(frameworkId);
+    TaskStatus* status = update->mutable_status();
+    status->mutable_task_id()->MergeFrom(taskId);
+    status->set_state(TASK_KILLED);
+    status->set_message("Killed pending task");
+    update->set_timestamp(Clock::now().secs());
+    update->set_uuid(UUID::random().toBytes());
+    send(framework->pid, message);
+
+    return;
+  }
+
   Task* task = framework->getTask(taskId);
   if (task == NULL) {
     // TODO(bmahler): Per MESOS-1200, if we knew the SlaveID here we
@@ -2778,6 +3031,13 @@ void Master::reconcileTasks(
       continue;
     }
 
+    if (framework->pendingTasks.contains(status.task_id())) {
+      LOG(WARNING) << "Status for task " << status.task_id()
+                   << " from framework " << frameworkId
+                   << " is unknown since the task is pending";
+      continue;
+    }
+
     Option<StatusUpdate> update;
 
     // Check for a removed slave (case 1).
@@ -3052,73 +3312,6 @@ vector<Framework*> Master::getActiveFrameworks() const
 }
 
 
-Resources Master::launchTask(const TaskInfo& task,
-                             Framework* framework,
-                             Slave* slave)
-{
-  CHECK_NOTNULL(framework);
-  CHECK_NOTNULL(slave);
-
-  Resources resources; // Total resources used on slave by launching this task.
-
-  // Determine if this task launches an executor, and if so make sure
-  // the slave and framework state has been updated accordingly.
-  Option<ExecutorID> executorId;
-
-  if (task.has_executor()) {
-    // TODO(benh): Refactor this code into Slave::addTask.
-    if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
-      CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
-        << "Executor " << task.executor().executor_id()
-        << " known to the framework " << framework->id
-        << " but unknown to the slave " << *slave;
-
-      slave->addExecutor(framework->id, task.executor());
-      framework->addExecutor(slave->id, task.executor());
-      resources += task.executor().resources();
-    }
-
-    executorId = task.executor().executor_id();
-  }
-
-  // Add the task to the framework and slave.
-  Task* t = new Task();
-  t->mutable_framework_id()->MergeFrom(framework->id);
-  t->set_state(TASK_STAGING);
-  t->set_name(task.name());
-  t->mutable_task_id()->MergeFrom(task.task_id());
-  t->mutable_slave_id()->MergeFrom(task.slave_id());
-  t->mutable_resources()->MergeFrom(task.resources());
-
-  if (executorId.isSome()) {
-    t->mutable_executor_id()->MergeFrom(executorId.get());
-  }
-
-  framework->addTask(t);
-
-  slave->addTask(t);
-
-  resources += task.resources();
-
-  // Tell the slave to launch the task!
-  LOG(INFO) << "Launching task " << task.task_id()
-            << " of framework " << framework->id
-            << " with resources " << task.resources()
-            << " on slave " << *slave;
-
-  RunTaskMessage message;
-  message.mutable_framework()->MergeFrom(framework->info);
-  message.mutable_framework_id()->MergeFrom(framework->id);
-  message.set_pid(framework->pid);
-  message.mutable_task()->MergeFrom(task);
-  send(slave->pid, message);
-
-  stats.tasks[TASK_STAGING]++;
-
-  return resources;
-}
-
-
 // NOTE: This function is only called when the slave re-registers
 // with a master that already knows about it (i.e., not a failed
 // over master).
@@ -3350,6 +3543,9 @@ void Master::removeFramework(Framework* framework)
     send(slave->pid, message);
   }
 
+  // Remove the pending tasks from the framework.
+  framework->pendingTasks.clear();
+
   // Remove pointers to the framework's tasks in slaves.
   foreachvalue (Task* task, utils::copy(framework->tasks)) {
     Slave* slave = getSlave(task->slave_id());

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 75f0d49..0c68a5b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -86,10 +86,10 @@ class SlaveObserver;
 class WhitelistWatcher;
 
 struct Framework;
-struct Slave;
-struct Role;
 struct OfferVisitor;
-
+struct Role;
+struct Slave;
+struct TaskInfoVisitor;
 
 class Master : public ProtobufProcess<Master>
 {
@@ -310,11 +310,27 @@ protected:
       const std::vector<StatusUpdate>& updates,
       const process::Future<bool>& removed);
 
-  // Launch a task from a task description, and returned the consumed
-  // resources for the task and possibly it's executor.
-  Resources launchTask(const TaskInfo& task,
-                       Framework* framework,
-                       Slave* slave);
+  // Validates the task including authorization.
+  // Returns None if the task is valid.
+  // Returns Error if the task is invalid.
+  // Returns Failure if authorization returns 'Failure'.
+  process::Future<Option<Error> > validateTask(
+      const TaskInfo& task,
+      Framework* framework,
+      Slave* slave,
+      const Resources& totalResources);
+
+  // Launch a task from a task description.
+  void launchTask(const TaskInfo& task, Framework* framework, Slave* slave);
+
+  // 'launchTasks()' continuation.
+  void _launchTasks(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const std::vector<TaskInfo>& tasks,
+      const Resources& totalResources,
+      const Filters& filters,
+      const process::Future<std::list<process::Future<Option<Error> > > >& f);
 
   // Remove a task.
   void removeTask(Task* task);
@@ -901,6 +917,10 @@ struct Framework
   process::Time reregisteredTime;
   process::Time unregisteredTime;
 
+  // Tasks that have not yet been launched because they are being
+  // validated (e.g., authorized).
+  hashmap<TaskID, TaskInfo> pendingTasks;
+
   hashmap<TaskID, Task*> tasks;
 
   // NOTE: We use a shared pointer for Task because clang doesn't like

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
new file mode 100644
index 0000000..17debaf
--- /dev/null
+++ b/src/tests/master_authorization_tests.cpp
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/try.hpp>
+
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+
+
+class MasterAuthorizationTest : public MesosTest {};
+
+
+// This test verifies that an authorized task launch is successful.
+TEST_F(MasterAuthorizationTest, AuthorizedTask)
+{
+  // Setup ACLs so that the framework can only launch tasks as "foo".
+  ACLs acls;
+  acls.set_permissive(false);
+  mesos::ACL::RunTasks* acl = acls.add_run_tasks();
+  acl->mutable_principals()->add_values(DEFAULT_FRAMEWORK_INFO.principal());
+  acl->mutable_users()->add_values("foo");
+
+  master::Flags flags = CreateMasterFlags();
+  flags.acls = JSON::Protobuf(acls);
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Create an authorized executor.
+  ExecutorInfo executor; // Bug in gcc 4.1.*, must assign on next line.
+  executor = CREATE_EXECUTOR_INFO("test-executor", "exit 1");
+  executor.mutable_command()->set_user("foo");
+
+  MockExecutor exec(executor.executor_id());
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create an authorized task.
+  TaskInfo task;
+  task.set_name("test");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(executor);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that an unauthorized task launch is rejected.
+TEST_F(MasterAuthorizationTest, UnauthorizedTask)
+{
+  // Setup ACLs so that no framework can launch as "foo".
+  ACLs acls;
+  mesos::ACL::RunTasks* acl = acls.add_run_tasks();
+  acl->mutable_principals()->set_type(mesos::ACL::Entity::NONE);
+  acl->mutable_users()->add_values("foo");
+
+  master::Flags flags = CreateMasterFlags();
+  flags.acls = JSON::Protobuf(acls);
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Create an unauthorized executor.
+  ExecutorInfo executor; // Bug in gcc 4.1.*, must assign on next line.
+  executor = CREATE_EXECUTOR_INFO("test-executor", "exit 1");
+  executor.mutable_command()->set_user("foo");
+
+  MockExecutor exec(executor.executor_id());
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create an unauthorized task.
+  TaskInfo task;
+  task.set_name("test");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(executor);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_LOST, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}


[3/7] git commit: Split Authorizer into source and header files.

Posted by vi...@apache.org.
Split Authorizer into source and header files.

Review: https://reviews.apache.org/r/22149


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6267a0f0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6267a0f0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6267a0f0

Branch: refs/heads/vinod/authorize_tasks
Commit: 6267a0f011ecaa8e0ecd151c4d02e5285b680931
Parents: dd94a1f
Author: Vinod Kone <vi...@twitter.com>
Authored: Sat May 31 19:03:08 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am               |   6 +-
 src/authorizer/authorizer.cpp | 333 +++++++++++++++++++++++++++++++++++++
 src/authorizer/authorizer.hpp | 310 +---------------------------------
 3 files changed, 338 insertions(+), 311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6267a0f0/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 4a3f2e1..1d49dca 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -210,7 +210,7 @@ nodist_libmesos_no_3rdparty_la_SOURCES =				\
   $(REGISTRY_PROTOS)
 
 libmesos_no_3rdparty_la_SOURCES =					\
-	authorizer/authorizer.hpp					\
+	authorizer/authorizer.cpp					\
 	sasl/authenticatee.hpp						\
 	sasl/authenticator.hpp						\
 	sasl/auxprop.hpp						\
@@ -325,7 +325,9 @@ if WITH_NETWORK_ISOLATOR
 	linux/routing/queueing/internal.hpp
 endif
 
-libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp		\
+libmesos_no_3rdparty_la_SOURCES +=					\
+	authorizer/authorizer.hpp					\
+	common/attributes.hpp						\
 	common/build.hpp common/date_utils.hpp common/factory.hpp	\
 	common/protobuf_utils.hpp					\
 	common/http.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/6267a0f0/src/authorizer/authorizer.cpp
----------------------------------------------------------------------
diff --git a/src/authorizer/authorizer.cpp b/src/authorizer/authorizer.cpp
new file mode 100644
index 0000000..c46b31d
--- /dev/null
+++ b/src/authorizer/authorizer.cpp
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/check.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/try.hpp>
+
+#include "authorizer/authorizer.hpp"
+
+#include "mesos/mesos.hpp"
+
+using process::Future;
+using process::Owned;
+using process::dispatch;
+
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+
+class LocalAuthorizerProcess : public ProtobufProcess<LocalAuthorizerProcess>
+{
+public:
+  LocalAuthorizerProcess(const ACLs& _acls)
+    : ProcessBase(process::ID::generate("authorizer")), acls(_acls) {}
+
+  Future<bool> authorize(const ACL::RunTasks& request)
+  {
+    foreach (const ACL::RunTasks& acl, acls.run_tasks()) {
+      // ACL matches if both subjects and objects match.
+      if (matches(request.principals(), acl.principals()) &&
+          matches(request.users(), acl.users())) {
+        // ACL is allowed if both subjects and objects are allowed.
+        return allows(request.principals(), acl.principals()) &&
+               allows(request.users(), acl.users());
+      }
+    }
+
+    return acls.permissive(); // None of the ACLs match.
+  }
+
+  Future<bool> authorize(const ACL::ReceiveOffers& request)
+  {
+    foreach (const ACL::ReceiveOffers& acl, acls.receive_offers()) {
+      // ACL matches if both subjects and objects match.
+      if (matches(request.principals(), acl.principals()) &&
+          matches(request.roles(), acl.roles())) {
+        // ACL is allowed if both subjects and objects are allowed.
+        return allows(request.principals(), acl.principals()) &&
+               allows(request.roles(), acl.roles());
+      }
+    }
+
+    return acls.permissive(); // None of the ACLs match.
+  }
+
+  Future<bool> authorize(const ACL::HTTPGet& request)
+  {
+    foreach (const ACL::HTTPGet& acl, acls.http_get()) {
+      // ACL matches if both subjects and objects match.
+      if (matches(request.usernames(), acl.usernames()) &&
+          matches(request.ips(), acl.ips()) &&
+          matches(request.hostnames(), acl.hostnames()) &&
+          matches(request.urls(), acl.urls())) {
+        // ACL is allowed if both subjects and objects are allowed.
+        return allows(request.usernames(), acl.usernames()) &&
+               allows(request.ips(), acl.ips()) &&
+               allows(request.hostnames(), acl.hostnames()) &&
+               allows(request.urls(), acl.urls());
+      }
+    }
+
+    return acls.permissive(); // None of the ACLs match.
+  }
+
+  Future<bool> authorize(const ACL::HTTPPut& request)
+  {
+    foreach (const ACL::HTTPPut& acl, acls.http_put()) {
+      // ACL matches if both subjects and objects match.
+      if (matches(request.usernames(), acl.usernames()) &&
+          matches(request.ips(), acl.ips()) &&
+          matches(request.hostnames(), acl.hostnames()) &&
+          matches(request.urls(), acl.urls())) {
+        // ACL is allowed if both subjects and objects are allowed.
+        return allows(request.usernames(), acl.usernames()) &&
+               allows(request.ips(), acl.ips()) &&
+               allows(request.hostnames(), acl.hostnames()) &&
+               allows(request.urls(), acl.urls());
+      }
+    }
+
+    return acls.permissive(); // None of the ACLs match.
+  }
+private:
+  // Match matrix:
+  //
+  //                  -----------ACL----------
+  //
+  //                    SOME    NONE    ANY
+  //          -------|-------|-------|-------
+  //  |        SOME  | Yes/No|  Yes  |   Yes
+  //  |       -------|-------|-------|-------
+  // Request   NONE  |  No   |  Yes  |   No
+  //  |       -------|-------|-------|-------
+  //  |        ANY   |  No   |  Yes  |   Yes
+  //          -------|-------|-------|-------
+  bool matches(const ACL::Entity& request, const ACL::Entity& acl)
+  {
+    // NONE only matches with NONE.
+    if (request.type() == ACL::Entity::NONE) {
+      return acl.type() == ACL::Entity::NONE;
+    }
+
+    // ANY matches with ANY or NONE.
+    if (request.type() == ACL::Entity::ANY) {
+      return acl.type() == ACL::Entity::ANY || acl.type() == ACL::Entity::NONE;
+    }
+
+    if (request.type() == ACL::Entity::SOME) {
+      // SOME matches with ANY or NONE.
+      if (acl.type() == ACL::Entity::ANY || acl.type() == ACL::Entity::NONE) {
+        return true;
+      }
+
+      // SOME is allowed if the request values are a subset of ACL
+      // values.
+      foreach (const string& value, request.values()) {
+        bool found = false;
+        foreach (const string& value_, acl.values()) {
+          if (value == value_) {
+            found = true;
+            break;
+          }
+        }
+
+        if (!found) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+  // Allow matrix:
+  //
+  //                 -----------ACL----------
+  //
+  //                    SOME    NONE    ANY
+  //          -------|-------|-------|-------
+  //  |        SOME  | Yes/No|  No   |   Yes
+  //  |       -------|-------|-------|-------
+  // Request   NONE  |  No   |  Yes  |   No
+  //  |       -------|-------|-------|-------
+  //  |        ANY   |  No   |  No   |   Yes
+  //          -------|-------|-------|-------
+  bool allows(const ACL::Entity& request, const ACL::Entity& acl)
+  {
+    // NONE is only allowed by NONE.
+    if (request.type() == ACL::Entity::NONE) {
+      return acl.type() == ACL::Entity::NONE;
+    }
+
+    // ANY is only allowed by ANY.
+    if (request.type() == ACL::Entity::ANY) {
+      return acl.type() == ACL::Entity::ANY;
+    }
+
+    if (request.type() == ACL::Entity::SOME) {
+      // SOME is allowed by ANY.
+      if (acl.type() == ACL::Entity::ANY) {
+        return true;
+      }
+
+      // SOME is not allowed by NONE.
+      if (acl.type() == ACL::Entity::NONE) {
+        return false;
+      }
+
+      // SOME is allowed if the request values are a subset of ACL
+      // values.
+      foreach (const string& value, request.values()) {
+        bool found = false;
+        foreach (const string& value_, acl.values()) {
+          if (value == value_) {
+            found = true;
+            break;
+          }
+        }
+
+        if (!found) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+  ACLs acls;
+};
+
+
+Try<Owned<Authorizer> > Authorizer::create(const JSON::Object& acls_)
+{
+  // Convert ACLs from JSON to Protobuf.
+  Try<ACLs> acls = protobuf::parse<ACLs>(acls_);
+  if (acls.isError()) {
+    return Error("Invalid ACLs format: " + acls.error());
+  }
+
+  Try<Owned<LocalAuthorizer> > authorizer = LocalAuthorizer::create(acls.get());
+
+  if (authorizer.isError()) {
+    return Error(authorizer.error());
+  }
+
+  Owned<LocalAuthorizer> authorizer_ = authorizer.get();
+  return static_cast<Authorizer*>(authorizer_.release());
+}
+
+
+LocalAuthorizer::LocalAuthorizer(const ACLs& acls)
+{
+  process = new LocalAuthorizerProcess(acls);
+  spawn(process);
+}
+
+
+LocalAuthorizer::~LocalAuthorizer()
+{
+  terminate(process);
+  wait(process);
+  delete process;
+}
+
+
+Try<Owned<LocalAuthorizer> > LocalAuthorizer::create(const ACLs& acls)
+{
+  // Validate ACLs.
+  foreach (const ACL::HTTPGet& acl, acls.http_get()) {
+    // At least one of the subjects should be set.
+    if (acl.has_usernames() + acl.has_ips() + acl.has_hostnames() < 1) {
+      return Error("At least one of the subjects should be set for ACL: " +
+                    acl.DebugString());
+    }
+  }
+
+  foreach (const ACL::HTTPPut& acl, acls.http_put()) {
+    // At least one of the subjects should be set.
+    if (acl.has_usernames() + acl.has_ips() + acl.has_hostnames() < 1) {
+       return Error("At least one of the subjects should be set for ACL: " +
+                     acl.DebugString());
+     }
+  }
+
+  return new LocalAuthorizer(acls);
+}
+
+
+Future<bool> LocalAuthorizer::authorize(const ACL::RunTasks& request)
+{
+  // Necessary to disambiguate.
+  typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::RunTasks&);
+
+  return dispatch(
+      process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
+
+Future<bool> LocalAuthorizer::authorize(const ACL::ReceiveOffers& request)
+{
+  // Necessary to disambiguate.
+  typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::ReceiveOffers&);
+
+  return dispatch(
+      process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
+
+Future<bool> LocalAuthorizer::authorize(const ACL::HTTPGet& request)
+{
+  // Necessary to disambiguate.
+  typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::HTTPGet&);
+
+  return dispatch(
+      process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
+
+Future<bool> LocalAuthorizer::authorize(const ACL::HTTPPut& request)
+{
+  // Necessary to disambiguate.
+  typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::HTTPPut&);
+
+  return dispatch(
+      process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6267a0f0/src/authorizer/authorizer.hpp
----------------------------------------------------------------------
diff --git a/src/authorizer/authorizer.hpp b/src/authorizer/authorizer.hpp
index b0d1eae..43e1c3b 100644
--- a/src/authorizer/authorizer.hpp
+++ b/src/authorizer/authorizer.hpp
@@ -19,22 +19,13 @@
 #ifndef __AUTHORIZER_AUTHORIZER_HPP__
 #define __AUTHORIZER_AUTHORIZER_HPP__
 
-#include <string>
-#include <vector>
-
 #include <glog/logging.h>
 
-#include <process/dispatch.hpp>
 #include <process/future.hpp>
-#include <process/id.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
-#include <process/protobuf.hpp>
 
-#include <stout/check.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/hashset.hpp>
-#include <stout/protobuf.hpp>
+#include <stout/json.hpp>
 #include <stout/try.hpp>
 
 #include "mesos/mesos.hpp"
@@ -90,305 +81,6 @@ private:
   LocalAuthorizerProcess* process;
 };
 
-
-class LocalAuthorizerProcess : public ProtobufProcess<LocalAuthorizerProcess>
-{
-public:
-  LocalAuthorizerProcess(const ACLs& _acls)
-    : ProcessBase(process::ID::generate("authorizer")), acls(_acls) {}
-
-  process::Future<bool> authorize(const ACL::RunTasks& request)
-  {
-    foreach (const ACL::RunTasks& acl, acls.run_tasks()) {
-      // ACL matches if both subjects and objects match.
-      if (matches(request.principals(), acl.principals()) &&
-          matches(request.users(), acl.users())) {
-        // ACL is allowed if both subjects and objects are allowed.
-        return allows(request.principals(), acl.principals()) &&
-               allows(request.users(), acl.users());
-      }
-    }
-
-    return acls.permissive(); // None of the ACLs match.
-  }
-
-  process::Future<bool> authorize(const ACL::ReceiveOffers& request)
-  {
-    foreach (const ACL::ReceiveOffers& acl, acls.receive_offers()) {
-      // ACL matches if both subjects and objects match.
-      if (matches(request.principals(), acl.principals()) &&
-          matches(request.roles(), acl.roles())) {
-        // ACL is allowed if both subjects and objects are allowed.
-        return allows(request.principals(), acl.principals()) &&
-               allows(request.roles(), acl.roles());
-      }
-    }
-
-    return acls.permissive(); // None of the ACLs match.
-  }
-
-  process::Future<bool> authorize(const ACL::HTTPGet& request)
-  {
-    foreach (const ACL::HTTPGet& acl, acls.http_get()) {
-      // ACL matches if both subjects and objects match.
-      if (matches(request.usernames(), acl.usernames()) &&
-          matches(request.ips(), acl.ips()) &&
-          matches(request.hostnames(), acl.hostnames()) &&
-          matches(request.urls(), acl.urls())) {
-        // ACL is allowed if both subjects and objects are allowed.
-        return allows(request.usernames(), acl.usernames()) &&
-               allows(request.ips(), acl.ips()) &&
-               allows(request.hostnames(), acl.hostnames()) &&
-               allows(request.urls(), acl.urls());
-      }
-    }
-
-    return acls.permissive(); // None of the ACLs match.
-  }
-
-  process::Future<bool> authorize(const ACL::HTTPPut& request)
-  {
-    foreach (const ACL::HTTPPut& acl, acls.http_put()) {
-      // ACL matches if both subjects and objects match.
-      if (matches(request.usernames(), acl.usernames()) &&
-          matches(request.ips(), acl.ips()) &&
-          matches(request.hostnames(), acl.hostnames()) &&
-          matches(request.urls(), acl.urls())) {
-        // ACL is allowed if both subjects and objects are allowed.
-        return allows(request.usernames(), acl.usernames()) &&
-               allows(request.ips(), acl.ips()) &&
-               allows(request.hostnames(), acl.hostnames()) &&
-               allows(request.urls(), acl.urls());
-      }
-    }
-
-    return acls.permissive(); // None of the ACLs match.
-  }
-private:
-  // Match matrix:
-  //
-  //                  -----------ACL----------
-  //
-  //                    SOME    NONE    ANY
-  //          -------|-------|-------|-------
-  //  |        SOME  | Yes/No|  Yes  |   Yes
-  //  |       -------|-------|-------|-------
-  // Request   NONE  |  No   |  Yes  |   No
-  //  |       -------|-------|-------|-------
-  //  |        ANY   |  No   |  Yes  |   Yes
-  //          -------|-------|-------|-------
-  bool matches(const ACL::Entity& request, const ACL::Entity& acl)
-  {
-    // NONE only matches with NONE.
-    if (request.type() == ACL::Entity::NONE) {
-      return acl.type() == ACL::Entity::NONE;
-    }
-
-    // ANY matches with ANY or NONE.
-    if (request.type() == ACL::Entity::ANY) {
-      return acl.type() == ACL::Entity::ANY || acl.type() == ACL::Entity::NONE;
-    }
-
-    if (request.type() == ACL::Entity::SOME) {
-      // SOME matches with ANY or NONE.
-      if (acl.type() == ACL::Entity::ANY || acl.type() == ACL::Entity::NONE) {
-        return true;
-      }
-
-      // SOME is allowed if the request values are a subset of ACL
-      // values.
-      foreach (const std::string& value, request.values()) {
-        bool found = false;
-        foreach (const std::string& value_, acl.values()) {
-          if (value == value_) {
-            found = true;
-            break;
-          }
-        }
-
-        if (!found) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    return false;
-  }
-
-  // Allow matrix:
-  //
-  //                 -----------ACL----------
-  //
-  //                    SOME    NONE    ANY
-  //          -------|-------|-------|-------
-  //  |        SOME  | Yes/No|  No   |   Yes
-  //  |       -------|-------|-------|-------
-  // Request   NONE  |  No   |  Yes  |   No
-  //  |       -------|-------|-------|-------
-  //  |        ANY   |  No   |  No   |   Yes
-  //          -------|-------|-------|-------
-  bool allows(const ACL::Entity& request, const ACL::Entity& acl)
-  {
-    // NONE is only allowed by NONE.
-    if (request.type() == ACL::Entity::NONE) {
-      return acl.type() == ACL::Entity::NONE;
-    }
-
-    // ANY is only allowed by ANY.
-    if (request.type() == ACL::Entity::ANY) {
-      return acl.type() == ACL::Entity::ANY;
-    }
-
-    if (request.type() == ACL::Entity::SOME) {
-      // SOME is allowed by ANY.
-      if (acl.type() == ACL::Entity::ANY) {
-        return true;
-      }
-
-      // SOME is not allowed by NONE.
-      if (acl.type() == ACL::Entity::NONE) {
-        return false;
-      }
-
-      // SOME is allowed if the request values are a subset of ACL
-      // values.
-      foreach (const std::string& value, request.values()) {
-        bool found = false;
-        foreach (const std::string& value_, acl.values()) {
-          if (value == value_) {
-            found = true;
-            break;
-          }
-        }
-
-        if (!found) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    return false;
-  }
-
-  ACLs acls;
-};
-
-
-Try<process::Owned<Authorizer> > Authorizer::create(const JSON::Object& acls_)
-{
-  // Convert ACLs from JSON to Protobuf.
-  Try<ACLs> acls = protobuf::parse<ACLs>(acls_);
-  if (acls.isError()) {
-    return Error("Invalid ACLs format: " + acls.error());
-  }
-
-  Try<process::Owned<LocalAuthorizer> > authorizer =
-    LocalAuthorizer::create(acls.get());
-
-  if (authorizer.isError()) {
-    return Error(authorizer.error());
-  }
-
-  process::Owned<LocalAuthorizer> authorizer_ = authorizer.get();
-  return static_cast<Authorizer*>(authorizer_.release());
-}
-
-
-LocalAuthorizer::LocalAuthorizer(const ACLs& acls)
-{
-  process = new LocalAuthorizerProcess(acls);
-  process::spawn(process);
-}
-
-
-LocalAuthorizer::~LocalAuthorizer()
-{
-  process::terminate(process);
-  process::wait(process);
-  delete process;
-}
-
-
-Try<process::Owned<LocalAuthorizer> > LocalAuthorizer::create(const ACLs& acls)
-{
-  // Validate ACLs.
-  foreach (const ACL::HTTPGet& acl, acls.http_get()) {
-    // At least one of the subjects should be set.
-    if (acl.has_usernames() + acl.has_ips() + acl.has_hostnames() < 1) {
-      return Error("At least one of the subjects should be set for ACL: " +
-                    acl.DebugString());
-    }
-  }
-
-  foreach (const ACL::HTTPPut& acl, acls.http_put()) {
-    // At least one of the subjects should be set.
-    if (acl.has_usernames() + acl.has_ips() + acl.has_hostnames() < 1) {
-       return Error("At least one of the subjects should be set for ACL: " +
-                     acl.DebugString());
-     }
-  }
-
-  return new LocalAuthorizer(acls);
-}
-
-
-process::Future<bool> LocalAuthorizer::authorize(
-    const ACL::RunTasks& request)
-{
-  // Necessary to disambiguate.
-  typedef process::Future<bool>(LocalAuthorizerProcess::*F)(
-      const ACL::RunTasks&);
-
-  return process::dispatch(
-      process,
-      static_cast<F>(&LocalAuthorizerProcess::authorize),
-      request);
-}
-
-
-process::Future<bool> LocalAuthorizer::authorize(
-    const ACL::ReceiveOffers& request)
-{
-  // Necessary to disambiguate.
-  typedef process::Future<bool>(LocalAuthorizerProcess::*F)(
-      const ACL::ReceiveOffers&);
-
-  return process::dispatch(
-      process,
-      static_cast<F>(&LocalAuthorizerProcess::authorize),
-      request);
-}
-
-
-process::Future<bool> LocalAuthorizer::authorize(
-    const ACL::HTTPGet& request)
-{
-  // Necessary to disambiguate.
-  typedef process::Future<bool>(LocalAuthorizerProcess::*F)(
-      const ACL::HTTPGet&);
-
-  return process::dispatch(
-      process,
-      static_cast<F>(&LocalAuthorizerProcess::authorize),
-      request);
-}
-
-
-process::Future<bool> LocalAuthorizer::authorize(
-    const ACL::HTTPPut& request)
-{
-  // Necessary to disambiguate.
-  typedef process::Future<bool>(LocalAuthorizerProcess::*F)(
-      const ACL::HTTPPut&);
-
-  return process::dispatch(
-      process,
-      static_cast<F>(&LocalAuthorizerProcess::authorize),
-      request);
-}
-
 } // namespace internal {
 } // namespace mesos {
 


[6/7] git commit: Added MockAuthorizer and more authorization tests.

Posted by vi...@apache.org.
Added MockAuthorizer and more authorization tests.

Review: https://reviews.apache.org/r/22190


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c4084554
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c4084554
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c4084554

Branch: refs/heads/vinod/authorize_tasks
Commit: c4084554e8245397c168d44d0e54a1c2491ca3a8
Parents: a6c4ee7
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Jun 2 22:20:11 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700

----------------------------------------------------------------------
 src/tests/cluster.hpp                    |  44 ++-
 src/tests/master_authorization_tests.cpp | 396 +++++++++++++++++++++++++-
 src/tests/mesos.cpp                      |  24 ++
 src/tests/mesos.hpp                      |  46 +++
 4 files changed, 502 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c4084554/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 449165c..1c96ee7 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -98,12 +98,25 @@ public:
     Try<process::PID<master::Master> > start(
         const master::Flags& flags = master::Flags());
 
+    // Start and manage a new master using the specified allocator
+    // process.
+    Try<process::PID<master::Master> > start(
+        master::allocator::AllocatorProcess* allocatorProcess,
+        const master::Flags& flags = master::Flags());
+
+    // Start and manage a new master using the specified authorizer.
+    Try<process::PID<master::Master> > start(
+        Authorizer* authorizer,
+        const master::Flags& flags = master::Flags());
+
     // Start and manage a new master using the specified flags.
-    // An allocator process may be specified in which case it will outlive
-    // the launched master.  If no allocator process is specified then
-    // the default allocator will be instantiated.
+    // An allocator process or authorizer may be specified in which
+    // case it will outlive the launched master. If either allocator
+    // process or authorizer is not specified then the default
+    // allocator or authorizer will be used.
     Try<process::PID<master::Master> > start(
         const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
+        const Option<Authorizer*>& authorizer,
         const master::Flags& flags = master::Flags());
 
     // Stops and cleans up a master at the specified PID.
@@ -272,12 +285,29 @@ inline void Cluster::Masters::shutdown()
 inline Try<process::PID<master::Master> > Cluster::Masters::start(
     const master::Flags& flags)
 {
-  return start(None(), flags);
+  return start(None(), None(), flags);
+}
+
+
+inline Try<process::PID<master::Master> > Cluster::Masters::start(
+    master::allocator::AllocatorProcess* allocator,
+    const master::Flags& flags)
+{
+  return start(allocator, None(), flags);
+}
+
+
+inline Try<process::PID<master::Master> > Cluster::Masters::start(
+    Authorizer* authorizer,
+    const master::Flags& flags)
+{
+  return start(None(), authorizer, flags);
 }
 
 
 inline Try<process::PID<master::Master> > Cluster::Masters::start(
     const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
+    const Option<Authorizer*>& authorizer,
     const master::Flags& flags)
 {
   // Disallow multiple masters when not using ZooKeeper.
@@ -354,7 +384,9 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
     master.detector = new StandaloneMasterDetector();
   }
 
-  if (flags.acls.isSome()) {
+  if (authorizer.isSome()) {
+    CHECK_NOTNULL(authorizer.get());
+  } else if (flags.acls.isSome()) {
     Try<process::Owned<Authorizer> > authorizer_ =
       Authorizer::create(flags.acls.get());
     if (authorizer_.isError()) {
@@ -372,7 +404,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
       &cluster->files,
       master.contender,
       master.detector,
-      master.authorizer,
+      authorizer.isSome() ? authorizer : master.authorizer,
       flags);
 
   if (url.isNone()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c4084554/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index 17debaf..3a28ca2 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -31,6 +31,7 @@
 #include <stout/gtest.hpp>
 #include <stout/try.hpp>
 
+#include "master/allocator.hpp"
 #include "master/master.hpp"
 
 #include "messages/messages.hpp"
@@ -45,16 +46,21 @@ using namespace mesos::internal::tests;
 
 using mesos::internal::master::Master;
 
+using mesos::internal::master::allocator::AllocatorProcess;
+
 using mesos::internal::slave::Slave;
 
 using process::Clock;
 using process::Future;
 using process::PID;
+using process::Promise;
 
 using std::vector;
 
 using testing::_;
+using testing::An;
 using testing::AtMost;
+using testing::DoAll;
 using testing::Return;
 
 
@@ -64,9 +70,8 @@ class MasterAuthorizationTest : public MesosTest {};
 // This test verifies that an authorized task launch is successful.
 TEST_F(MasterAuthorizationTest, AuthorizedTask)
 {
-  // Setup ACLs so that the framework can only launch tasks as "foo".
+  // Setup ACLs so that the framework can launch tasks as "foo".
   ACLs acls;
-  acls.set_permissive(false);
   mesos::ACL::RunTasks* acl = acls.add_run_tasks();
   acl->mutable_principals()->add_values(DEFAULT_FRAMEWORK_INFO.principal());
   acl->mutable_users()->add_values("foo");
@@ -207,3 +212,390 @@ TEST_F(MasterAuthorizationTest, UnauthorizedTask)
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
+
+
+// This test verifies that a 'killTask()' that comes before
+// '_launchTasks()' is called results in TASK_KILLED.
+TEST_F(MasterAuthorizationTest, KillTask)
+{
+  MockAuthorizer authorizer;
+  Try<PID<Master> > master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Return a pending future from authorizer.
+  Future<Nothing> future;
+  Promise<bool> promise;
+  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+    .WillOnce(DoAll(FutureSatisfy(&future),
+                    Return(promise.future())));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Wait until authorization is in progress.
+  AWAIT_READY(future);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Now kill the task.
+  driver.killTask(task.task_id());
+
+  // Framework should get a TASK_KILLED right away.
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_KILLED, status.get().state());
+
+  Future<Nothing> resourcesUnused =
+    FUTURE_DISPATCH(_, &AllocatorProcess::resourcesUnused);
+
+  // Now complete authorization.
+  promise.set(true);
+
+  // No task launch should happen resulting in all resources being
+  // returned to the allocator.
+  AWAIT_READY(resourcesUnused);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that a slave removal that comes before
+// '_launchTasks()' is called results in TASK_LOST.
+TEST_F(MasterAuthorizationTest, SlaveRemoved)
+{
+  MockAuthorizer authorizer;
+  Try<PID<Master> > master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Return a pending future from authorizer.
+  Future<Nothing> future;
+  Promise<bool> promise;
+  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+    .WillOnce(DoAll(FutureSatisfy(&future),
+                    Return(promise.future())));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Wait until authorization is in progress.
+  AWAIT_READY(future);
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  // Now stop the slave.
+  Stop(slave.get());
+
+  AWAIT_READY(slaveLost);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> resourcesRecovered =
+    FUTURE_DISPATCH(_, &AllocatorProcess::resourcesRecovered);
+
+  // Now complete authorization.
+  promise.set(true);
+
+  // Framework should get a TASK_LOST.
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_LOST, status.get().state());
+
+  // No task launch should happen resulting in all resources being
+  // returned to the allocator.
+  AWAIT_READY(resourcesRecovered);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that a slave disconnection that comes before
+// '_launchTasks()' is called results in TASK_LOST.
+TEST_F(MasterAuthorizationTest, SlaveDisconnected)
+{
+  MockAuthorizer authorizer;
+  Try<PID<Master> > master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  // Create a checkpointing slave so that a disconnected slave is not
+  // immediately removed.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+  Try<PID<Slave> > slave = StartSlave(&exec, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Return a pending future from authorizer.
+  Future<Nothing> future;
+  Promise<bool> promise;
+  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+    .WillOnce(DoAll(FutureSatisfy(&future),
+                    Return(promise.future())));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Wait until authorization is in progress.
+  AWAIT_READY(future);
+
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .Times(AtMost(1));
+
+  Future<Nothing> slaveDisconnected =
+    FUTURE_DISPATCH(_, &AllocatorProcess::slaveDisconnected);
+
+  // Now stop the slave.
+  Stop(slave.get());
+
+  AWAIT_READY(slaveDisconnected);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> resourcesRecovered =
+    FUTURE_DISPATCH(_, &AllocatorProcess::resourcesRecovered);
+
+  // Now complete authorization.
+  promise.set(true);
+
+  // Framework should get a TASK_LOST.
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_LOST, status.get().state());
+
+  // No task launch should happen resulting in all resources being
+  // returned to the allocator.
+  AWAIT_READY(resourcesRecovered);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that a framework removal that comes before
+// '_launchTasks()' is called results in recovery of resources.
+TEST_F(MasterAuthorizationTest, FrameworkRemoved)
+{
+  MockAuthorizer authorizer;
+  Try<PID<Master> > master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Return a pending future from authorizer.
+  Future<Nothing> future;
+  Promise<bool> promise;
+  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+    .WillOnce(DoAll(FutureSatisfy(&future),
+                    Return(promise.future())));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Wait until authorization is in progress.
+  AWAIT_READY(future);
+
+  Future<Nothing> frameworkRemoved =
+    FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved);
+
+  // Now stop the framework.
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(frameworkRemoved);
+
+  Future<Nothing> resourcesRecovered =
+    FUTURE_DISPATCH(_, &AllocatorProcess::resourcesRecovered);
+
+  // Now complete authorization.
+  promise.set(true);
+
+  // No task launch should happen resulting in all resources being
+  // returned to the allocator.
+  AWAIT_READY(resourcesRecovered);
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that a reconciliation request that comes before
+// '_launchTasks()' is ignored.
+TEST_F(MasterAuthorizationTest, ReconcileTask)
+{
+  MockAuthorizer authorizer;
+  Try<PID<Master> > master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Return a pending future from authorizer.
+  Future<Nothing> future;
+  Promise<bool> promise;
+  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTasks&>()))
+    .WillOnce(DoAll(FutureSatisfy(&future),
+                    Return(promise.future())));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Wait until authorization is in progress.
+  AWAIT_READY(future);
+
+  // Scheduler shouldn't get an update from reconciliation.
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .Times(0);
+
+  Future<ReconcileTasksMessage> reconcileTasksMessage =
+    FUTURE_PROTOBUF(ReconcileTasksMessage(), _, _);
+
+  vector<TaskStatus> statuses;
+
+  TaskStatus status;
+  status.mutable_task_id()->CopyFrom(task.task_id());
+  status.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+  status.set_state(TASK_STAGING);
+
+  statuses.push_back(status);
+
+  driver.reconcileTasks(statuses);
+
+  AWAIT_READY(reconcileTasksMessage);
+
+  // Make sure the framework doesn't receive any update.
+  Clock::pause();
+  Clock::settle();
+
+  // Now stop the framework.
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/c4084554/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index ea6a1c0..e6d807c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -27,6 +27,8 @@
 #include <stout/stringify.hpp>
 #include <stout/uuid.hpp>
 
+#include "authorizer/authorizer.hpp"
+
 #ifdef __linux__
 #include "linux/cgroups.hpp"
 #endif
@@ -199,6 +201,28 @@ Try<process::PID<master::Master> > MesosTest::StartMaster(
 }
 
 
+Try<process::PID<master::Master> > MesosTest::StartMaster(
+    Authorizer* authorizer,
+    const Option<master::Flags>& flags,
+    bool wait)
+{
+  Future<Nothing> detected = FUTURE_DISPATCH(_, &master::Master::detected);
+
+  Try<process::PID<master::Master> > master = cluster.masters.start(
+      authorizer, flags.isNone() ? CreateMasterFlags() : flags.get());
+
+  // Wait until the leader is detected because otherwise this master
+  // may reject authentication requests because it doesn't know it's
+  // the leader yet [MESOS-881].
+  if (wait && master.isSome() && !detected.await(Seconds(10))) {
+    return Error("Failed to wait " + stringify(Seconds(10)) +
+                 " for master to detect the leader");
+  }
+
+  return master;
+}
+
+
 Try<process::PID<slave::Slave> > MesosTest::StartSlave(
     const Option<slave::Flags>& flags)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c4084554/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 2e01e5e..0b9b2f9 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -41,6 +41,8 @@
 #include <stout/try.hpp>
 #include <stout/uuid.hpp>
 
+#include "authorizer/authorizer.hpp"
+
 #include "messages/messages.hpp" // For google::protobuf::Message.
 
 #include "master/allocator.hpp"
@@ -101,6 +103,14 @@ protected:
       const Option<master::Flags>& flags = None(),
       bool wait = true);
 
+  // Starts a master with the specified authorizer and flags.
+  // Waits for the master to detect a leader (could be itself) before
+  // returning if 'wait' is set to true.
+  virtual Try<process::PID<master::Master> > StartMaster(
+      Authorizer* authorizer,
+      const Option<master::Flags>& flags = None(),
+      bool wait = true);
+
   // Starts a slave with the specified flags.
   virtual Try<process::PID<slave::Slave> > StartSlave(
       const Option<slave::Flags>& flags = None());
@@ -469,6 +479,42 @@ public:
 };
 
 
+// Definition of a MockAuthozier that can be used in tests with gmock.
+class MockAuthorizer : public Authorizer
+{
+public:
+  MockAuthorizer()
+  {
+    using ::testing::An;
+    using ::testing::Return;
+
+    // NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+    // 'ON_CALL' and 'WillByDefault'. See 'TestContainerizer::SetUp()'
+    // for more details.
+    EXPECT_CALL(*this, authorize(An<const mesos::ACL::RunTasks&>()))
+      .WillRepeatedly(Return(true));
+
+    EXPECT_CALL(*this, authorize(An<const mesos::ACL::ReceiveOffers&>()))
+      .WillRepeatedly(Return(true));
+
+    EXPECT_CALL(*this, authorize(An<const mesos::ACL::HTTPGet&>()))
+      .WillRepeatedly(Return(true));
+
+    EXPECT_CALL(*this, authorize(An<const mesos::ACL::HTTPPut&>()))
+      .WillRepeatedly(Return(true));
+  }
+
+  MOCK_METHOD1(
+      authorize, process::Future<bool>(const ACL::RunTasks& request));
+  MOCK_METHOD1(
+      authorize, process::Future<bool>(const ACL::ReceiveOffers& request));
+  MOCK_METHOD1(
+      authorize, process::Future<bool>(const ACL::HTTPGet& request));
+  MOCK_METHOD1(
+      authorize, process::Future<bool>(const ACL::HTTPPut& request));
+};
+
+
 template <typename T = master::allocator::AllocatorProcess>
 class MockAllocatorProcess : public master::allocator::AllocatorProcess
 {


[4/7] git commit: Fixed a bug in scheduler driver to properly erase 'savedOffers'.

Posted by vi...@apache.org.
Fixed a bug in scheduler driver to properly erase 'savedOffers'.

Review: https://reviews.apache.org/r/22148


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dd94a1fe
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dd94a1fe
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dd94a1fe

Branch: refs/heads/vinod/authorize_tasks
Commit: dd94a1fe9aff281f49d61bd8c214f41fcb340b04
Parents: 23a25e7
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu May 29 15:32:03 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dd94a1fe/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 9459c9c..6e14f1c 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -890,7 +890,7 @@ protected:
       foreach (const TaskInfo& task, result) {
         // Keep only the slave PIDs where we run tasks so we can send
         // framework messages directly.
-        if (savedOffers.count(offerId) > 0) {
+        if (savedOffers.contains(offerId)) {
           if (savedOffers[offerId].count(task.slave_id()) > 0) {
             savedSlavePids[task.slave_id()] =
               savedOffers[offerId][task.slave_id()];
@@ -902,10 +902,9 @@ protected:
           LOG(WARNING) << "Attempting to launch task " << task.task_id()
                        << " with an unknown offer " << offerId;
         }
-
-        // Remove the offer since we saved all the PIDs we might use.
-        savedOffers.erase(offerId);
       }
+      // Remove the offer since we saved all the PIDs we might use.
+      savedOffers.erase(offerId);
     }
 
     foreach (const TaskInfo& task, result) {


[7/7] git commit: Changed TaskInfoError and OfferError to Option.

Posted by vi...@apache.org.
Changed TaskInfoError and OfferError to Option<Error>.

Review: https://reviews.apache.org/r/22338


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b73ea627
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b73ea627
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b73ea627

Branch: refs/heads/vinod/authorize_tasks
Commit: b73ea62753bb4e7301edb4691d0b85c3be490fb5
Parents: b508516
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Jun 6 15:45:33 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 57 ++++++++++++++++++++++------------------------
 1 file changed, 27 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b73ea627/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7884aa4..df75c8a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -34,6 +34,7 @@
 #include <process/metrics/metrics.hpp>
 
 #include <stout/check.hpp>
+#include <stout/error.hpp>
 #include <stout/lambda.hpp>
 #include <stout/memory.hpp>
 #include <stout/multihashmap.hpp>
@@ -1375,11 +1376,9 @@ void Master::resourceRequest(
 // back to the framework for only that task description. An instance
 // will be reused for each task description from same 'launchTasks()',
 // but not for task descriptions from different offers.
-typedef Option<string> TaskInfoError;
-
 struct TaskInfoVisitor
 {
-  virtual TaskInfoError operator () (
+  virtual Option<Error> operator () (
       const TaskInfo& task,
       const Resources& resources,
       const Framework& framework,
@@ -1391,7 +1390,7 @@ struct TaskInfoVisitor
 // Checks that a task id is valid, i.e., contains only valid characters.
 struct TaskIDChecker : TaskInfoVisitor
 {
-  virtual TaskInfoError operator () (
+  virtual Option<Error> operator () (
       const TaskInfo& task,
       const Resources& resources,
       const Framework& framework,
@@ -1416,7 +1415,7 @@ struct TaskIDChecker : TaskInfoVisitor
 // Checks that the slave ID used by a task is correct.
 struct SlaveIDChecker : TaskInfoVisitor
 {
-  virtual TaskInfoError operator () (
+  virtual Option<Error> operator () (
       const TaskInfo& task,
       const Resources& resources,
       const Framework& framework,
@@ -1438,7 +1437,7 @@ struct SlaveIDChecker : TaskInfoVisitor
 // task tries to re-use an ID.
 struct UniqueTaskIDChecker : TaskInfoVisitor
 {
-  virtual TaskInfoError operator () (
+  virtual Option<Error> operator () (
       const TaskInfo& task,
       const Resources& resources,
       const Framework& framework,
@@ -1464,7 +1463,7 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
 // offered on that slave
 struct ResourceUsageChecker : TaskInfoVisitor
 {
-  virtual TaskInfoError operator () (
+  virtual Option<Error> operator () (
       const TaskInfo& task,
       const Resources& resources,
       const Framework& framework,
@@ -1531,7 +1530,7 @@ struct ResourceUsageChecker : TaskInfoVisitor
 // ExecutorID) have an identical ExecutorInfo.
 struct ExecutorInfoChecker : TaskInfoVisitor
 {
-  virtual TaskInfoError operator () (
+  virtual Option<Error> operator () (
       const TaskInfo& task,
       const Resources& resources,
       const Framework& framework,
@@ -1572,7 +1571,7 @@ struct ExecutorInfoChecker : TaskInfoVisitor
 // launched on a slave that has not enabled checkpointing.
 struct CheckpointChecker : TaskInfoVisitor
 {
-  virtual TaskInfoError operator () (
+  virtual Option<Error> operator () (
       const TaskInfo& task,
       const Resources& resources,
       const Framework& framework,
@@ -1592,11 +1591,9 @@ struct CheckpointChecker : TaskInfoVisitor
 // The error reporting scheme is also similar to TaskInfoVisitor.
 // However, offer processing (and subsequent task processing) is
 // aborted altogether if offer visitor reports an error.
-typedef Option<string> OfferError;
-
 struct OfferVisitor
 {
-  virtual OfferError operator () (
+  virtual Option<Error> operator () (
       const OfferID& offerId,
       const Framework& framework,
       Master* master) = 0;
@@ -1619,14 +1616,14 @@ struct OfferVisitor
 
 // Checks validity/liveness of an offer.
 struct ValidOfferChecker : OfferVisitor {
-  virtual OfferError operator () (
+  virtual Option<Error> operator () (
       const OfferID& offerId,
       const Framework& framework,
       Master* master)
   {
     Offer* offer = getOffer(master, offerId);
     if (offer == NULL) {
-      return "Offer " + stringify(offerId) + " is no longer valid";
+      return Error("Offer " + stringify(offerId) + " is no longer valid");
     }
 
     return None();
@@ -1636,14 +1633,14 @@ struct ValidOfferChecker : OfferVisitor {
 
 // Checks that an offer belongs to the expected framework.
 struct FrameworkChecker : OfferVisitor {
-  virtual OfferError operator () (
+  virtual Option<Error> operator () (
       const OfferID& offerId,
       const Framework& framework,
       Master* master)
   {
     Offer* offer = getOffer(master, offerId);
     if (offer == NULL) {
-      return "Offer " + stringify(offerId) + " is no longer valid";
+      return Error("Offer " + stringify(offerId) + " is no longer valid");
     }
 
     if (!(framework.id == offer->framework_id())) {
@@ -1661,7 +1658,7 @@ struct FrameworkChecker : OfferVisitor {
 // the same slave.
 struct SlaveChecker : OfferVisitor
 {
-  virtual OfferError operator () (
+  virtual Option<Error> operator () (
       const OfferID& offerId,
       const Framework& framework,
       Master* master)
@@ -1703,7 +1700,7 @@ struct SlaveChecker : OfferVisitor
 // Checks that an offer only appears once in offer list.
 struct UniqueOfferIDChecker : OfferVisitor
 {
-  virtual OfferError operator () (
+  virtual Option<Error> operator () (
       const OfferID& offerId,
       const Framework& framework,
       Master* master)
@@ -1782,17 +1779,17 @@ void Master::launchTasks(
   // Verify and aggregate all offers.
   // Abort offer and task processing if any offer validation failed.
   Resources totalResources;
-  OfferError offerError = None();
+  Option<Error> error = None();
   foreach (const OfferID& offerId, offerIds) {
     foreach (OfferVisitor* visitor, offerVisitors) {
-      offerError = (*visitor)(offerId, *framework, this);
-      if (offerError.isSome()) {
+      error = (*visitor)(offerId, *framework, this);
+      if (error.isSome()) {
         break;
       }
     }
     // Offer validation error needs to be propagated from visitor
     // loop above.
-    if (offerError.isSome()) {
+    if (error.isSome()) {
       break;
     }
 
@@ -1819,7 +1816,7 @@ void Master::launchTasks(
   foreach (const OfferID& offerId, offerIds) {
     Offer* offer = getOffer(offerId);
     if (offer != NULL) {
-      if (offerError.isSome()) {
+      if (error.isSome()) {
         allocator->resourcesRecovered(
             offer->framework_id(), offer->slave_id(), offer->resources());
       }
@@ -1827,9 +1824,9 @@ void Master::launchTasks(
     }
   }
 
-  if (offerError.isSome()) {
-    LOG(WARNING) << "Failed to validate offers " << stringify(offerIds)
-                   << ": " << offerError.get();
+  if (error.isSome()) {
+    LOG(WARNING) << "Failed to validate offer " << stringify(offerIds)
+                   << ": " << error.get().message;
 
     foreach (const TaskInfo& task, tasks) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
@@ -1837,7 +1834,7 @@ void Master::launchTasks(
           task.slave_id(),
           task.task_id(),
           TASK_LOST,
-          "Task launched with invalid offers: " + offerError.get());
+          "Task launched with invalid offers: " + error.get().message);
 
       LOG(INFO) << "Sending status update " << update
                 << " for launch task attempt on invalid offers: "
@@ -1873,7 +1870,7 @@ void Master::launchTasks(
   // Loop through each task and check it's validity.
   foreach (const TaskInfo& task, tasks) {
     // Possible error found while checking task's validity.
-    TaskInfoError error = None();
+    Option<Error> error = None();
 
     // Invoke each visitor.
     foreach (TaskInfoVisitor* visitor, taskVisitors) {
@@ -1889,14 +1886,14 @@ void Master::launchTasks(
     } else {
       // Error validating task, send a failed status update.
       LOG(WARNING) << "Failed to validate task " << task.task_id()
-                   << " : " << error.get();
+                   << " : " << error.get().message;
 
       const StatusUpdate& update = protobuf::createStatusUpdate(
           framework->id,
           slave->id,
           task.task_id(),
           TASK_LOST,
-          error.get());
+          error.get().message);
 
       LOG(INFO) << "Sending status update "
                 << update << " for invalid task";