You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by me...@apache.org on 2016/12/01 07:46:16 UTC

[1/2] mesos git commit: Added authorization to Nested Container API.

Repository: mesos
Updated Branches:
  refs/heads/master 48f6deb2d -> de2a7f414


Added authorization to Nested Container API.

Makes use of the already existing authorization actions and ACLs
definitions and wires them together with the existing API
implementations.

Review: https://reviews.apache.org/r/53851/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/de2a7f41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/de2a7f41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/de2a7f41

Branch: refs/heads/master
Commit: de2a7f41407b6b171d10675b7a09bcbfea41564d
Parents: 19296e0
Author: Alexander Rojas <al...@mesosphere.io>
Authored: Wed Nov 30 18:03:40 2016 -0800
Committer: Adam B <ad...@mesosphere.io>
Committed: Wed Nov 30 23:41:16 2016 -0800

----------------------------------------------------------------------
 src/slave/http.cpp      | 320 +++++++++++++++++++++++++++++++------------
 src/tests/api_tests.cpp |   6 -
 2 files changed, 231 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/de2a7f41/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 6915afe..ace3575 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -2016,78 +2016,114 @@ Future<Response> Slave::Http::launchNestedContainer(
   CHECK_EQ(mesos::agent::Call::LAUNCH_NESTED_CONTAINER, call.type());
   CHECK(call.has_launch_nested_container());
 
-  const ContainerID& containerId =
-    call.launch_nested_container().container_id();
-
-  // We do not yet support launching containers that are nested
-  // two levels beneath the executor's container.
-  if (containerId.parent().has_parent()) {
-    return NotImplemented(
-        "Only a single level of container nesting is supported currently,"
-        " but 'launch_nested_container.container_id.parent.parent' is set");
-  }
+  Future<Owned<ObjectApprover>> approver;
 
-  // Locate the executor (for now we just loop since we don't
-  // index based on container id and this likely won't have a
-  // significant performance impact due to the low number of
-  // executors per-agent).
-  Executor* executor = nullptr;
-  foreachvalue (Framework* framework, slave->frameworks) {
-    foreachvalue (Executor* executor_, framework->executors) {
-      if (executor_->containerId == containerId.parent()) {
-        executor = executor_;
-        break;
-      }
+  if (slave->authorizer.isSome()) {
+    authorization::Subject subject;
+    if (principal.isSome()) {
+      subject.set_value(principal.get());
     }
-  }
 
-  // Return a "Bad Request" here rather than "Not Found" since
-  // the executor needs to set parent to its container id.
-  if (executor == nullptr) {
-    return BadRequest("Unable to locate executor for parent container"
-                      " " + stringify(containerId.parent()));
+    approver = slave->authorizer.get()->getObjectApprover(
+        subject, authorization::LAUNCH_NESTED_CONTAINER);
+  } else {
+    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
   }
 
-  // By default, we use the executor's user.
-  // The command user overrides it if specified.
-  Option<string> user = executor->user;
+  return approver.then(defer(slave->self(),
+    [this, call, contentType](const Owned<ObjectApprover>& launchApprover)
+        -> Future<Response> {
+      const ContainerID& containerId =
+        call.launch_nested_container().container_id();
+
+      // We do not yet support launching containers that are nested
+      // two levels beneath the executor's container.
+      if (containerId.parent().has_parent()) {
+        return NotImplemented(
+            "Only a single level of container nesting is supported currently,"
+            " but 'launch_nested_container.container_id.parent.parent' is set");
+      }
+
+      // Locate the executor (for now we just loop since we don't
+      // index based on container id and this likely won't have a
+      // significant performance impact due to the low number of
+      // executors per-agent).
+      // TODO(adam-mesos): Support more levels of nesting.
+      Executor* executor = nullptr;
+      Framework* framework = nullptr;
+      foreachvalue (Framework* framework_, slave->frameworks) {
+        foreachvalue (Executor* executor_, framework_->executors) {
+          if (executor_->containerId == containerId.parent()) {
+            framework = framework_;
+            executor = executor_;
+            break;
+          }
+        }
+      }
+
+      // Return a "Bad Request" here rather than "Not Found" since
+      // the executor needs to set parent to its container id.
+      if (executor == nullptr || framework == nullptr) {
+        return BadRequest("Unable to locate executor for parent container"
+                          " " + stringify(containerId.parent()));
+      }
+
+      ObjectApprover::Object object;
+      object.executor_info = &(executor->info);
+      object.framework_info = &(framework->info);
+      if (call.launch_nested_container().has_command()) {
+        object.command_info = &(call.launch_nested_container().command());
+      }
+
+      Try<bool> approved = launchApprover.get()->approved(object);
+
+      if (approved.isError()) {
+        return Failure(approved.error());
+      } else if (!approved.get()) {
+        return Forbidden();
+      }
+
+      // By default, we use the executor's user.
+      // The command user overrides it if specified.
+      Option<string> user = executor->user;
 
 #ifndef __WINDOWS__
-  if (call.launch_nested_container().command().has_user()) {
-    user = call.launch_nested_container().command().user();
-  }
+      if (call.launch_nested_container().command().has_user()) {
+        user = call.launch_nested_container().command().user();
+      }
 #endif
 
-  Future<bool> launched = slave->containerizer->launch(
-      containerId,
-      call.launch_nested_container().command(),
-      call.launch_nested_container().has_container()
-        ? call.launch_nested_container().container()
-        : Option<ContainerInfo>::none(),
-      user,
-      slave->info.id());
-
-  // TODO(bmahler): The containerizers currently require that
-  // the caller calls destroy if the launch fails. See MESOS-6214.
-  launched
-    .onFailed(defer(slave->self(), [=](const string& failure) {
-      LOG(WARNING) << "Failed to launch nested container " << containerId
-                   << ": " << failure;
-
-      slave->containerizer->destroy(containerId)
-        .onFailed([=](const string& failure) {
-          LOG(ERROR) << "Failed to destroy neseted container " << containerId
-                     << " after launch failure: " << failure;
-        });
-    }));
+      Future<bool> launched = slave->containerizer->launch(
+          containerId,
+          call.launch_nested_container().command(),
+          call.launch_nested_container().has_container()
+            ? call.launch_nested_container().container()
+            : Option<ContainerInfo>::none(),
+          user,
+          slave->info.id());
+
+      // TODO(bmahler): The containerizers currently require that
+      // the caller calls destroy if the launch fails. See MESOS-6214.
+      launched
+        .onFailed(defer(slave->self(), [=](const string& failure) {
+          LOG(WARNING) << "Failed to launch nested container " << containerId
+                       << ": " << failure;
+
+          slave->containerizer->destroy(containerId)
+            .onFailed([=](const string& failure) {
+              LOG(ERROR) << "Failed to destroy nested container "
+                         << containerId << " after launch failure: " << failure;
+            });
+        }));
 
-  return launched
-    .then([](bool launched) -> Response {
-      if (!launched) {
-        return BadRequest("The provided ContainerInfo is not supported");
-      }
-      return OK();
-    });
+      return launched
+        .then([](bool launched) -> Response {
+          if (!launched) {
+            return BadRequest("The provided ContainerInfo is not supported");
+          }
+          return OK();
+          });
+    }));
 }
 
 
@@ -2099,33 +2135,86 @@ Future<Response> Slave::Http::waitNestedContainer(
   CHECK_EQ(mesos::agent::Call::WAIT_NESTED_CONTAINER, call.type());
   CHECK(call.has_wait_nested_container());
 
-  const ContainerID& containerId =
-    call.wait_nested_container().container_id();
+  Future<Owned<ObjectApprover>> approver;
 
-  Future<Option<mesos::slave::ContainerTermination>> wait =
-    slave->containerizer->wait(containerId);
+  if (slave->authorizer.isSome()) {
+    authorization::Subject subject;
+    if (principal.isSome()) {
+      subject.set_value(principal.get());
+    }
+
+    approver = slave->authorizer.get()->getObjectApprover(
+        subject, authorization::WAIT_NESTED_CONTAINER);
+  } else {
+    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
+  }
 
-  return wait
-    .then([containerId, contentType](
-        const Option<ContainerTermination>& termination) -> Response {
-      if (termination.isNone()) {
+  return approver.then(defer(slave->self(),
+    [this, call, contentType](const Owned<ObjectApprover>& waitApprover)
+        -> Future<Response> {
+      const ContainerID& containerId =
+        call.wait_nested_container().container_id();
+
+      // Locate the executor (for now we just loop since we don't
+      // index based on container id and this likely won't have a
+      // significant performance impact due to the low number of
+      // executors per-agent).
+      // TODO(adam-mesos): Support more levels of nesting.
+      Executor* executor = nullptr;
+      Framework* framework = nullptr;
+      foreachvalue (Framework* framework_, slave->frameworks) {
+        foreachvalue (Executor* executor_, framework_->executors) {
+          if (executor_->containerId == containerId.parent() ||
+              executor_->containerId == containerId) {
+            framework = framework_;
+            executor = executor_;
+            break;
+          }
+        }
+      }
+
+      if (executor == nullptr || framework == nullptr) {
         return NotFound("Container " + stringify(containerId) +
                         " cannot be found");
       }
 
-      mesos::agent::Response response;
-      response.set_type(mesos::agent::Response::WAIT_NESTED_CONTAINER);
+      ObjectApprover::Object object;
+      object.executor_info = &(executor->info);
+      object.framework_info = &(framework->info);
 
-      mesos::agent::Response::WaitNestedContainer* waitNestedContainer =
-        response.mutable_wait_nested_container();
+      Try<bool> approved = waitApprover.get()->approved(object);
 
-      if (termination->has_status()) {
-        waitNestedContainer->set_exit_status(termination->status());
+      if (approved.isError()) {
+        return Failure(approved.error());
+      } else if (!approved.get()) {
+        return Forbidden();
       }
 
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
-    });
+      Future<Option<mesos::slave::ContainerTermination>> wait =
+        slave->containerizer->wait(containerId);
+
+      return wait
+        .then([containerId, contentType](
+            const Option<ContainerTermination>& termination) -> Response {
+          if (termination.isNone()) {
+            return NotFound("Container " + stringify(containerId) +
+                            " cannot be found");
+          }
+
+          mesos::agent::Response response;
+          response.set_type(mesos::agent::Response::WAIT_NESTED_CONTAINER);
+
+          mesos::agent::Response::WaitNestedContainer* waitNestedContainer =
+            response.mutable_wait_nested_container();
+
+          if (termination->has_status()) {
+            waitNestedContainer->set_exit_status(termination->status());
+          }
+
+          return OK(serialize(contentType, evolve(response)),
+                    stringify(contentType));
+        });
+    }));
 }
 
 
@@ -2137,19 +2226,72 @@ Future<Response> Slave::Http::killNestedContainer(
   CHECK_EQ(mesos::agent::Call::KILL_NESTED_CONTAINER, call.type());
   CHECK(call.has_kill_nested_container());
 
-  const ContainerID& containerId =
-    call.kill_nested_container().container_id();
+  Future<Owned<ObjectApprover>> approver;
+
+  if (slave->authorizer.isSome()) {
+    authorization::Subject subject;
+    if (principal.isSome()) {
+      subject.set_value(principal.get());
+    }
 
-  Future<bool> destroy = slave->containerizer->destroy(containerId);
+    approver = slave->authorizer.get()->getObjectApprover(
+        subject, authorization::KILL_NESTED_CONTAINER);
+  } else {
+    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
+  }
 
-  return destroy
-    .then([containerId](bool found) -> Response {
-      if (!found) {
-        return NotFound("Container '" + stringify(containerId) + "'"
-                        " cannot be found (or is already killed)");
+  return approver.then(defer(slave->self(),
+    [this, call, contentType](const Owned<ObjectApprover>& killApprover)
+        -> Future<Response> {
+      const ContainerID& containerId =
+        call.kill_nested_container().container_id();
+
+      // Locate the executor (for now we just loop since we don't
+      // index based on container id and this likely won't have a
+      // significant performance impact due to the low number of
+      // executors per-agent).
+      // TODO(adam-mesos): Support more levels of nesting.
+      Executor* executor = nullptr;
+      Framework* framework = nullptr;
+      foreachvalue (Framework* framework_, slave->frameworks) {
+        foreachvalue (Executor* executor_, framework_->executors) {
+          if (executor_->containerId == containerId.parent() ||
+              executor_->containerId == containerId) {
+            framework = framework_;
+            executor = executor_;
+            break;
+          }
+        }
       }
-      return OK();
-    });
+
+      if (executor == nullptr || framework == nullptr) {
+        return NotFound("Container " + stringify(containerId) +
+                        " cannot be found");
+      }
+
+      ObjectApprover::Object object;
+      object.executor_info = &(executor->info);
+      object.framework_info = &(framework->info);
+
+      Try<bool> approved = killApprover.get()->approved(object);
+
+      if (approved.isError()) {
+        return Failure(approved.error());
+      } else if (!approved.get()) {
+        return Forbidden();
+      }
+
+      Future<bool> destroy = slave->containerizer->destroy(containerId);
+
+      return destroy
+        .then([containerId](bool found) -> Response {
+          if (!found) {
+            return NotFound("Container '" + stringify(containerId) + "'"
+                            " cannot be found (or is already killed)");
+          }
+          return OK();
+        });
+    }));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/de2a7f41/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index ca4f1de..ea6e037 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -3290,9 +3290,6 @@ TEST_P(AgentAPITest, NestedContainerWaitNotFound)
 
   // Expect a 404 for waiting on unknown containers.
   {
-    EXPECT_CALL(mockContainerizer, wait(_))
-      .WillOnce(Return(Future<Option<ContainerTermination>>(None())));
-
     v1::agent::Call call;
     call.set_type(v1::agent::Call::WAIT_NESTED_CONTAINER);
 
@@ -3345,9 +3342,6 @@ TEST_P(AgentAPITest, NestedContainerKillNotFound)
 
   // Expect a 404 for killing unknown containers.
   {
-    EXPECT_CALL(mockContainerizer, destroy(_))
-      .WillOnce(Return(Future<bool>(false)));
-
     v1::agent::Call call;
     call.set_type(v1::agent::Call::KILL_NESTED_CONTAINER);
 


[2/2] mesos git commit: Added authorization actions for Nested Container and Debug API.

Posted by me...@apache.org.
Added authorization actions for Nested Container and Debug API.

Creates new authorization action for all the API's related to
nested containers. This patch does not add the code necesary to
call use those actions, this is done in a latter patch.

Review: https://reviews.apache.org/r/53541/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/19296e0f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/19296e0f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/19296e0f

Branch: refs/heads/master
Commit: 19296e0fc2bd28f83bafdf5a7ac48146ee085449
Parents: 48f6deb
Author: Alexander Rojas <al...@mesosphere.io>
Authored: Wed Nov 30 17:51:22 2016 -0800
Committer: Adam B <ad...@mesosphere.io>
Committed: Wed Nov 30 23:41:16 2016 -0800

----------------------------------------------------------------------
 include/mesos/authorizer/acls.proto       |  101 ++
 include/mesos/authorizer/authorizer.hpp   |    8 +-
 include/mesos/authorizer/authorizer.proto |   21 +
 src/authorizer/local/authorizer.cpp       |  215 ++++-
 src/tests/authorization_tests.cpp         | 1207 ++++++++++++++++++++++++
 5 files changed, 1549 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/19296e0f/include/mesos/authorizer/acls.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/acls.proto b/include/mesos/authorizer/acls.proto
index e3fd6a4..3499cac 100644
--- a/include/mesos/authorizer/acls.proto
+++ b/include/mesos/authorizer/acls.proto
@@ -256,6 +256,95 @@ message ACL {
     // access.
     required Entity flags = 2;
   }
+
+  // Which principals are authorized to launch nested containers running as
+  // the given users.
+  message LaunchNestedContainerAsUser {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: The list of operating system users (e.g., linux users) to run
+    // the nested containers as.
+    required Entity users = 2;
+  }
+
+  // Which principals are authorized to launch a nested container under a
+  // top-level container running as the given user.
+  message LaunchNestedContainerUnderParentWithUser {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: The operating system users (e.g. linux users) of the top-level
+    // containers under which the principal may launch a nested container.
+    required Entity users = 2;
+  }
+
+  // Which principals are authorized to launch nested container sessions
+  // running as the given users.
+  message LaunchNestedContainerSessionAsUser {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: The list of operating system users (e.g., linux users) a nested
+    // container (TTY) session can be run as.
+    required Entity users = 2;
+  }
+
+  // Which principals are authorized to launch nested container sessions under
+  // a top-level container whose executor was launched with the given user.
+  message LaunchNestedContainerSessionUnderParentWithUser {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: The operating system users (e.g. linux users) of the top-level
+    // containers under which the principal may launch a nested container
+    // session.
+    required Entity users = 2;
+  }
+
+  // Which principals are authorized to attach to the input of a nested
+  // container whose executor was launched with the given user.
+  message AttachContainerInput {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: The list of operating system users (e.g., linux users) whose
+    // containers are available to connect the stdin.
+    required Entity users = 2;
+  }
+
+  // Which principals are authorized to attach to the output of a nested
+  // container whose executor was launched with the given user.
+  message AttachContainerOutput {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: The list of operating system users (e.g., linux users) whose
+    // containers are available to connect the stdout and stderr.
+    required Entity users = 2;
+  }
+
+  // Which principals are authorized to kill a nested container under a top
+  // level container whose executor was launched with the given user.
+  message KillNestedContainer {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: The list of operating system users (e.g., linux users) whose
+    // nested containers can be killed.
+    required Entity users = 2;
+  }
+
+  // Which principals are authorized to wait on a nested container under a top
+  // level container whose executor was launched with the given user.
+  message WaitNestedContainer {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: The list of operating system users (e.g., linux users) whose
+    // nested containers can be waited on.
+    required Entity users = 2;
+  }
 }
 
 
@@ -307,4 +396,16 @@ message ACLs {
   repeated ACL.AccessMesosLog access_mesos_logs = 20;
   repeated ACL.ViewRole view_roles = 21;
   repeated ACL.ViewFlags view_flags = 22;
+  repeated ACL.LaunchNestedContainerAsUser
+      launch_nested_containers_as_user = 23;
+  repeated ACL.LaunchNestedContainerUnderParentWithUser
+      launch_nested_containers_under_parent_with_user = 24;
+  repeated ACL.KillNestedContainer kill_nested_containers = 25;
+  repeated ACL.WaitNestedContainer wait_nested_containers = 26;
+  repeated ACL.LaunchNestedContainerSessionAsUser
+      launch_nested_container_sessions_as_user = 27;
+  repeated ACL.LaunchNestedContainerSessionUnderParentWithUser
+      launch_nested_container_sessions_under_parent_with_user = 28;
+  repeated ACL.AttachContainerInput attach_containers_input = 29;
+  repeated ACL.AttachContainerOutput attach_containers_output = 30;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/19296e0f/include/mesos/authorizer/authorizer.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/authorizer.hpp b/include/mesos/authorizer/authorizer.hpp
index 3cd0f84..75801cc 100644
--- a/include/mesos/authorizer/authorizer.hpp
+++ b/include/mesos/authorizer/authorizer.hpp
@@ -53,7 +53,8 @@ public:
         executor_info(nullptr),
         quota_info(nullptr),
         weight_info(nullptr),
-        resource(nullptr) {}
+        resource(nullptr),
+        command_info(nullptr) {}
 
     Object(const authorization::Object& object)
       : value(object.has_value() ? &object.value() : nullptr),
@@ -65,7 +66,9 @@ public:
             object.has_executor_info() ? &object.executor_info() : nullptr),
         quota_info(object.has_quota_info() ? &object.quota_info() : nullptr),
         weight_info(object.has_weight_info() ? &object.weight_info() : nullptr),
-        resource(object.has_resource() ? &object.resource() : nullptr) {}
+        resource(object.has_resource() ? &object.resource() : nullptr),
+        command_info(
+            object.has_command_info() ? &object.command_info() : nullptr) {}
 
     const std::string* value;
     const FrameworkInfo* framework_info;
@@ -75,6 +78,7 @@ public:
     const quota::QuotaInfo* quota_info;
     const WeightInfo* weight_info;
     const Resource* resource;
+    const CommandInfo* command_info;
   };
 
   /**

http://git-wip-us.apache.org/repos/asf/mesos/blob/19296e0f/include/mesos/authorizer/authorizer.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/authorizer.proto b/include/mesos/authorizer/authorizer.proto
index 0696a62..b7371cd 100644
--- a/include/mesos/authorizer/authorizer.proto
+++ b/include/mesos/authorizer/authorizer.proto
@@ -48,6 +48,7 @@ message Object {
   optional quota.QuotaInfo quota_info = 6;
   optional WeightInfo weight_info = 7;
   optional Resource resource = 8;
+  optional CommandInfo command_info = 9;
 }
 
 
@@ -153,6 +154,26 @@ enum Action {
   // This action will not fill in any object fields, since the object
   // is the entire set of flags.
   VIEW_FLAGS = 18;
+
+  // This action will always set the `ExecutorInfo`, `FrameworkInfo` fields
+  // and optionally a `CommandInfo` if available.
+  LAUNCH_NESTED_CONTAINER = 19;
+
+  // This action will set objects of type `ExecutorInfo` and `FrameworkInfo`.
+  KILL_NESTED_CONTAINER = 20;
+
+  // This action will set objects of type `ExecutorInfo` and `FrameworkInfo`.
+  WAIT_NESTED_CONTAINER = 21;
+
+  // This action will always set the `ExecutorInfo`, `FrameworkInfo` fields
+  // and optionally a `CommandInfo` if available.
+  LAUNCH_NESTED_CONTAINER_SESSION = 22;
+
+  // This action will set objects of type `ExecutorInfo` and `FrameworkInfo`.
+  ATTACH_CONTAINER_INPUT = 23;
+
+  // This action will set objects of type `ExecutorInfo` and `FrameworkInfo`.
+  ATTACH_CONTAINER_OUTPUT = 24;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/19296e0f/src/authorizer/local/authorizer.cpp
----------------------------------------------------------------------
diff --git a/src/authorizer/local/authorizer.cpp b/src/authorizer/local/authorizer.cpp
index 77e05dd..3b983d0 100644
--- a/src/authorizer/local/authorizer.cpp
+++ b/src/authorizer/local/authorizer.cpp
@@ -195,7 +195,7 @@ public:
       const GenericACLs& acls,
       const Option<authorization::Subject>& subject,
       const authorization::Action& action,
-      const bool permissive)
+      bool permissive)
     : acls_(acls),
       subject_(subject),
       action_(action),
@@ -344,6 +344,24 @@ public:
 
           break;
         }
+        case authorization::ATTACH_CONTAINER_INPUT:
+        case authorization::ATTACH_CONTAINER_OUTPUT:
+        case authorization::KILL_NESTED_CONTAINER:
+        case authorization::WAIT_NESTED_CONTAINER: {
+          aclObject.set_type(mesos::ACL::Entity::ANY);
+
+          if (object->executor_info != nullptr &&
+              object->executor_info->command().has_user()) {
+            aclObject.add_values(object->executor_info->command().user());
+            aclObject.set_type(mesos::ACL::Entity::SOME);
+          } else if (object->framework_info != nullptr &&
+                     object->framework_info->has_user()) {
+            aclObject.add_values(object->framework_info->user());
+            aclObject.set_type(mesos::ACL::Entity::SOME);
+          }
+
+          break;
+        }
         case authorization::ACCESS_SANDBOX: {
           aclObject.set_type(mesos::ACL::Entity::ANY);
 
@@ -445,6 +463,30 @@ public:
 
           break;
         }
+        case authorization::LAUNCH_NESTED_CONTAINER:
+        case authorization::LAUNCH_NESTED_CONTAINER_SESSION: {
+          aclObject.set_type(mesos::ACL::Entity::ANY);
+
+          if (object->command_info != nullptr) {
+            if (object->command_info->has_user()) {
+              aclObject.add_values(object->command_info->user());
+              aclObject.set_type(mesos::ACL::Entity::SOME);
+            }
+            break;
+          }
+
+          if (object->executor_info != nullptr &&
+              object->executor_info->command().has_user()) {
+            aclObject.add_values(object->executor_info->command().user());
+            aclObject.set_type(mesos::ACL::Entity::SOME);
+          } else if (object->framework_info != nullptr &&
+              object->framework_info->has_user()) {
+            aclObject.add_values(object->framework_info->user());
+            aclObject.set_type(mesos::ACL::Entity::SOME);
+          }
+
+          break;
+        }
         case authorization::UNKNOWN:
           LOG(WARNING) << "Authorization for action '" << action_
                        << "' is not defined and therefore not authorized";
@@ -479,6 +521,58 @@ private:
 };
 
 
+class LocalNestedContainerObjectApprover : public ObjectApprover
+{
+public:
+  LocalNestedContainerObjectApprover(
+      const GenericACLs& userAcls,
+      const GenericACLs& parentAcls,
+      const Option<authorization::Subject>& subject,
+      const authorization::Action& action,
+      bool permissive)
+    : childApprover_(userAcls, subject, action, permissive),
+      parentApprover_(parentAcls, subject, action, permissive) {}
+
+  // Launching Nested Containers and sessions in Nester Containers is
+  // authorized if a principal is allowed to launch nester container (sessions)
+  // under an executor running under a given OS user and, if a command
+  // is available, the principal is also allowed to run the command as
+  // the given OS user.
+  virtual Try<bool> approved(
+      const Option<ObjectApprover::Object>& object) const noexcept override
+  {
+    if (object.isNone() || object->command_info == nullptr) {
+      return parentApprover_.approved(object);
+    }
+
+    ObjectApprover::Object parentObject;
+    parentObject.executor_info = object->executor_info;
+    parentObject.framework_info = object->framework_info;
+
+    Try<bool> parentApproved = parentApprover_.approved(parentObject);
+
+    if (parentApproved.isError()) {
+      return parentApproved;
+    }
+
+    ObjectApprover::Object childObject;
+    childObject.command_info = object->command_info;
+
+    Try<bool> childApproved = childApprover_.approved(childObject);
+
+    if (childApproved.isError()) {
+      return childApproved;
+    }
+
+    return parentApproved.get() && childApproved.get();
+  }
+
+private:
+  LocalAuthorizerObjectApprover childApprover_;
+  LocalAuthorizerObjectApprover parentApprover_;
+};
+
+
 class LocalAuthorizerProcess : public ProtobufProcess<LocalAuthorizerProcess>
 {
 public:
@@ -537,6 +631,63 @@ public:
       });
   }
 
+  Future<Owned<ObjectApprover>> getNestedContainerObjectApprover(
+      const Option<authorization::Subject>& subject,
+      const authorization::Action& action)
+  {
+    CHECK(action == authorization::LAUNCH_NESTED_CONTAINER ||
+          action == authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+
+    vector<GenericACL> runAsUserAcls;
+    vector<GenericACL> parentRunningAsUserAcls;
+
+    if (action == authorization::LAUNCH_NESTED_CONTAINER) {
+      foreach (const ACL::LaunchNestedContainerAsUser& acl,
+               acls.launch_nested_containers_as_user()) {
+        GenericACL acl_;
+        acl_.subjects = acl.principals();
+        acl_.objects = acl.users();
+
+        runAsUserAcls.push_back(acl_);
+      }
+
+      foreach (const ACL::LaunchNestedContainerUnderParentWithUser& acl,
+               acls.launch_nested_containers_under_parent_with_user()) {
+        GenericACL acl_;
+        acl_.subjects = acl.principals();
+        acl_.objects = acl.users();
+
+        parentRunningAsUserAcls.push_back(acl_);
+      }
+    } else {
+      foreach (const ACL::LaunchNestedContainerSessionAsUser& acl,
+               acls.launch_nested_container_sessions_as_user()) {
+        GenericACL acl_;
+        acl_.subjects = acl.principals();
+        acl_.objects = acl.users();
+
+        runAsUserAcls.push_back(acl_);
+      }
+
+      foreach (const ACL::LaunchNestedContainerSessionUnderParentWithUser& acl,
+               acls.launch_nested_container_sessions_under_parent_with_user())
+      {
+        GenericACL acl_;
+        acl_.subjects = acl.principals();
+        acl_.objects = acl.users();
+
+        parentRunningAsUserAcls.push_back(acl_);
+      }
+    }
+
+    return Owned<ObjectApprover>(new LocalNestedContainerObjectApprover(
+        runAsUserAcls,
+        parentRunningAsUserAcls,
+        subject,
+        action,
+        acls.permissive()));
+  }
+
   Future<Owned<ObjectApprover>> getObjectApprover(
       const Option<authorization::Subject>& subject,
       const authorization::Action& action)
@@ -552,6 +703,11 @@ public:
       }
     };
 
+    if (action == authorization::LAUNCH_NESTED_CONTAINER ||
+        action == authorization::LAUNCH_NESTED_CONTAINER_SESSION) {
+      return getNestedContainerObjectApprover(subject, action);
+    }
+
     // Generate GenericACLs.
     Result<GenericACLs> genericACLs = createGenericACLs(action, acls);
     if (genericACLs.isError()) {
@@ -764,6 +920,58 @@ private:
         return acls_;
         break;
       }
+      case authorization::ATTACH_CONTAINER_INPUT: {
+        foreach (const ACL::AttachContainerInput& acl,
+            acls.attach_containers_input()) {
+          GenericACL acl_;
+          acl_.subjects = acl.principals();
+          acl_.objects = acl.users();
+
+          acls_.push_back(acl_);
+        }
+
+        return acls_;
+        break;
+      }
+      case authorization::ATTACH_CONTAINER_OUTPUT: {
+        foreach (const ACL::AttachContainerOutput& acl,
+            acls.attach_containers_output()) {
+          GenericACL acl_;
+          acl_.subjects = acl.principals();
+          acl_.objects = acl.users();
+
+          acls_.push_back(acl_);
+        }
+
+        return acls_;
+        break;
+      }
+      case authorization::KILL_NESTED_CONTAINER: {
+        foreach (const ACL::KillNestedContainer& acl,
+            acls.kill_nested_containers()) {
+          GenericACL acl_;
+          acl_.subjects = acl.principals();
+          acl_.objects = acl.users();
+
+          acls_.push_back(acl_);
+        }
+
+        return acls_;
+        break;
+      }
+      case authorization::WAIT_NESTED_CONTAINER: {
+        foreach (const ACL::WaitNestedContainer& acl,
+            acls.wait_nested_containers()) {
+          GenericACL acl_;
+          acl_.subjects = acl.principals();
+          acl_.objects = acl.users();
+
+          acls_.push_back(acl_);
+        }
+
+        return acls_;
+        break;
+      }
       case authorization::VIEW_FRAMEWORK:
         foreach (const ACL::ViewFramework& acl, acls.view_frameworks()) {
           GenericACL acl_;
@@ -797,6 +1005,11 @@ private:
 
         return acls_;
         break;
+      case authorization::LAUNCH_NESTED_CONTAINER_SESSION:
+      case authorization::LAUNCH_NESTED_CONTAINER:
+        return Error("Extracting ACLs for launching nested containers requires "
+                     "a specialized function");
+        break;
       case authorization::UNKNOWN:
         // Cannot generate acls for an unknown action.
         return None();

http://git-wip-us.apache.org/repos/asf/mesos/blob/19296e0f/src/tests/authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/authorization_tests.cpp b/src/tests/authorization_tests.cpp
index 511832d..8609371 100644
--- a/src/tests/authorization_tests.cpp
+++ b/src/tests/authorization_tests.cpp
@@ -2516,6 +2516,1213 @@ TYPED_TEST(AuthorizationTest, SandBoxAccess)
 }
 
 
+// This tests the authorization of launching sessions in nested containers.
+TYPED_TEST(AuthorizationTest, LaunchNestedContainerSessions)
+{
+  // Setup ACLs.
+  ACLs acls;
+
+  {
+    // "ops" can launch sessions in any parent container.
+    mesos::ACL::LaunchNestedContainerSessionUnderParentWithUser* acl =
+        acls.add_launch_nested_container_sessions_under_parent_with_user();
+    acl->mutable_principals()->add_values("ops");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // "foo" cannot launch sessions in any container running as any user.
+    mesos::ACL::LaunchNestedContainerSessionUnderParentWithUser* acl =
+        acls.add_launch_nested_container_sessions_under_parent_with_user();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "bar" can launch sessions nested under a container running as
+    // linux user "bar".
+    mesos::ACL::LaunchNestedContainerSessionUnderParentWithUser* acl =
+        acls.add_launch_nested_container_sessions_under_parent_with_user();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_users()->add_values("bar");
+  }
+
+  {
+    // No one else can launch sessions in nested containers.
+    mesos::ACL::LaunchNestedContainerSessionUnderParentWithUser* acl =
+        acls.add_launch_nested_container_sessions_under_parent_with_user();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "foo" principal cannot launch sessions as commands in any parent
+    // container. He may still get to launch container sessions if he
+    // is allowed launch nested container sessions whose executors are
+    // running as a given user for which "foo" has permissions and the
+    // session uses a `container_info` instead of a `command_info`.
+    mesos::ACL::LaunchNestedContainerSessionAsUser* acl =
+        acls.add_launch_nested_container_sessions_as_user();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "bar" principal can launch sessions running as user "bar".
+    mesos::ACL::LaunchNestedContainerSessionAsUser* acl =
+        acls.add_launch_nested_container_sessions_as_user();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_users()->add_values("bar");
+  }
+
+  {
+    // "ops" principal can launch sessions as any linux user.
+    mesos::ACL::LaunchNestedContainerSessionAsUser* acl =
+        acls.add_launch_nested_container_sessions_as_user();
+    acl->mutable_principals()->add_values("ops");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // No one else can launch sessions as any user.
+    mesos::ACL::LaunchNestedContainerSessionAsUser* acl =
+        acls.add_launch_nested_container_sessions_as_user();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.set_user("user");
+
+  // Create ExecutorInfo with a user not mentioned in the ACLs in
+  // command as object to be authorized.
+  ExecutorInfo executorInfo;
+  {
+    executorInfo.set_name("Task");
+    executorInfo.mutable_executor_id()->set_value("t");
+    executorInfo.mutable_command()->set_value("echo hello");
+    executorInfo.mutable_command()->set_user("user");
+  }
+
+  // Create ExecutorInfo with user "bar" in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoBar;
+  {
+    executorInfoBar.set_name("Executor");
+    executorInfoBar.mutable_executor_id()->set_value("e");
+    executorInfoBar.mutable_command()->set_value("echo hello");
+    executorInfoBar.mutable_command()->set_user("bar");
+  }
+
+  // Create ExecutorInfo with no user in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoNoUser;
+  {
+    executorInfoNoUser.set_name("Executor");
+    executorInfoNoUser.mutable_executor_id()->set_value("s");
+    executorInfoNoUser.mutable_command()->set_value("echo hello");
+  }
+
+  // Create ExecutorInfo with no user in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoNoCommand;
+  {
+    executorInfoNoCommand.set_name("Executor");
+    executorInfoNoCommand.mutable_executor_id()->set_value("t");
+  }
+
+  // Principal "foo" cannot launch a session with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" cannot launch a session with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can launch a session with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can launch a session with a request with the
+  // default `FrameworkInfo.user`.
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can launch a session with a request where the
+  // executor is running as a container, it falls back to the
+  // `FrameworkInfo.user`.
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoCommand);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" can launch a session as user "bar" under parent containers
+  // running as user "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Create CommandInfo with a user not mentioned in the ACLs in
+  // command as object to be authorized.
+  CommandInfo commandInfo;
+  {
+    commandInfo.set_value("echo hello");
+    commandInfo.set_user("user");
+  }
+
+  // Create CommandInfo with user "bar" in command as object to
+  // be authorized.
+  CommandInfo commandInfoBar;
+  {
+    commandInfoBar.set_value("echo hello");
+    commandInfoBar.set_user("bar");
+  }
+
+  // Create CommandInfo with no user in command as object to
+  // be authorized.
+  CommandInfo commandInfoNoUser;
+  {
+    commandInfoNoUser.set_value("echo hello");
+  }
+
+  // Principal "foo" cannot launch a session with a request with
+  // ExecutorInfo and CommandInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(commandInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" cannot launch a session with a request with
+  // CommandInfo running with user "user", even if the ExecutorInfo
+  // is running as "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(commandInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" cannot launch a session with a request with
+  // CommandInfo and ExecutorInfo running with user "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(commandInfoBar);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can launch a sessions with any combination of requests.
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_command_info()->MergeFrom(commandInfoBar);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(
+        commandInfoNoUser);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(
+        commandInfoBar);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_command_info()->MergeFrom(
+        commandInfoBar);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_command_info()->MergeFrom(
+        commandInfoNoUser);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+}
+
+
+// This tests the authorization of attaching to the input stream of a container.
+TYPED_TEST(AuthorizationTest, AttachContainerInput)
+{
+  // Setup ACLs.
+  ACLs acls;
+
+  {
+    // "foo" principal cannot attach to the input of any container.
+    mesos::ACL::AttachContainerInput* acl =
+        acls.add_attach_containers_input();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "bar" principal can attach to the input of containers running under
+    // user "bar".
+    mesos::ACL::AttachContainerInput* acl =
+        acls.add_attach_containers_input();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_users()->add_values("bar");
+  }
+
+  {
+    // "ops" principal can attach to the input of all containers.
+    mesos::ACL::AttachContainerInput* acl =
+        acls.add_attach_containers_input();
+    acl->mutable_principals()->add_values("ops");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // No one else can attach to the input of any container.
+    mesos::ACL::AttachContainerInput* acl =
+        acls.add_attach_containers_input();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.set_user("user");
+
+  // Create ExecutorInfo with a user not mentioned in the ACLs in
+  // command as object to be authorized.
+  ExecutorInfo executorInfo;
+  {
+    executorInfo.set_name("Task");
+    executorInfo.mutable_executor_id()->set_value("t");
+    executorInfo.mutable_command()->set_value("echo hello");
+    executorInfo.mutable_command()->set_user("user");
+  }
+
+  // Create ExecutorInfo with user "bar" in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoBar;
+  {
+    executorInfoBar.set_name("Executor");
+    executorInfoBar.mutable_executor_id()->set_value("e");
+    executorInfoBar.mutable_command()->set_value("echo hello");
+    executorInfoBar.mutable_command()->set_user("bar");
+  }
+
+  // Create ExecutorInfo with no user in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoNoUser;
+  {
+    executorInfoNoUser.set_name("Executor");
+    executorInfoNoUser.mutable_executor_id()->set_value("e");
+    executorInfoNoUser.mutable_command()->set_value("echo hello");
+  }
+
+  // Principal "foo" cannot attach to the input of a container with a request
+  // with ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_INPUT);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" cannot attach to the input of a container with a request
+  // with ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_INPUT);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can attach to the input of a container with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_INPUT);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" can attach to the input of a container with a request with
+  // ExecutorInfo running with user "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_INPUT);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can attach to the input of a container with a request with
+  // an ExecutorInfo without user.
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_INPUT);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+}
+
+
+// This tests the authorization of attaching to the output stream of a
+// container.
+TYPED_TEST(AuthorizationTest, AttachContainerOutput)
+{
+  // Setup ACLs.
+  ACLs acls;
+
+  {
+    // "foo" principal cannot attach to the output of any container.
+    mesos::ACL::AttachContainerOutput* acl =
+        acls.add_attach_containers_output();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "bar" principal can attach to the output of containers running under
+    // user "bar".
+    mesos::ACL::AttachContainerOutput* acl =
+        acls.add_attach_containers_output();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_users()->add_values("bar");
+  }
+
+  {
+    // "ops" principal can attach to the output of all containers.
+    mesos::ACL::AttachContainerOutput* acl =
+        acls.add_attach_containers_output();
+    acl->mutable_principals()->add_values("ops");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // No one else can attach to the output of any container.
+    mesos::ACL::AttachContainerOutput* acl =
+        acls.add_attach_containers_output();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.set_user("user");
+
+  // Create ExecutorInfo with a user not mentioned in the ACLs in
+  // command as object to be authorized.
+  ExecutorInfo executorInfo;
+  {
+    executorInfo.set_name("Task");
+    executorInfo.mutable_executor_id()->set_value("t");
+    executorInfo.mutable_command()->set_value("echo hello");
+    executorInfo.mutable_command()->set_user("user");
+  }
+
+  // Create ExecutorInfo with user "bar" in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoBar;
+  {
+    executorInfoBar.set_name("Executor");
+    executorInfoBar.mutable_executor_id()->set_value("e");
+    executorInfoBar.mutable_command()->set_value("echo hello");
+    executorInfoBar.mutable_command()->set_user("bar");
+  }
+
+  // Create ExecutorInfo with no user in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoNoUser;
+  {
+    executorInfoNoUser.set_name("Executor");
+    executorInfoNoUser.mutable_executor_id()->set_value("e");
+    executorInfoNoUser.mutable_command()->set_value("echo hello");
+  }
+
+  // Principal "foo" cannot attach to the output of a container with a request
+  // with ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_OUTPUT);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" cannot attach to the output of a container with a request
+  // with ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_OUTPUT);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can attach to the output of a container with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_OUTPUT);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" can attach to the output of a container with a request with
+  // ExecutorInfo running with user "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_OUTPUT);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can attach to the output of a container with a request with
+  // an ExecutorInfo without user.
+  {
+    authorization::Request request;
+    request.set_action(authorization::ATTACH_CONTAINER_OUTPUT);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+}
+
+
+// This tests the authorization of launching nested containers.
+TYPED_TEST(AuthorizationTest, LaunchNestedContainers)
+{
+  // Setup ACLs.
+  ACLs acls;
+
+  {
+    // "ops" can launch nested containers whose executor runs as any user.
+    mesos::ACL::LaunchNestedContainerUnderParentWithUser* acl =
+        acls.add_launch_nested_containers_under_parent_with_user();
+    acl->mutable_principals()->add_values("ops");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // "foo" cannot launch nested containers.
+    mesos::ACL::LaunchNestedContainerUnderParentWithUser* acl =
+        acls.add_launch_nested_containers_under_parent_with_user();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "bar" can launch nested containers under executors running as
+    // linux user "bar".
+    mesos::ACL::LaunchNestedContainerUnderParentWithUser* acl =
+        acls.add_launch_nested_containers_under_parent_with_user();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_users()->add_values("bar");
+  }
+
+  {
+    // No one else can launch nested containers.
+    mesos::ACL::LaunchNestedContainerUnderParentWithUser* acl =
+        acls.add_launch_nested_containers_under_parent_with_user();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "foo" principal cannot launch nested containers as commands
+    // under any parent container. He may still get to launch nested
+    // container if he is allowed to do so for executors wich run as
+    // a given user for which "foo" has permissions and the session
+    // uses a `container_info` instead of a `command_info`.
+    mesos::ACL::LaunchNestedContainerAsUser* acl =
+        acls.add_launch_nested_containers_as_user();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "bar" principal can launch nested containers as user "bar".
+    mesos::ACL::LaunchNestedContainerAsUser* acl =
+        acls.add_launch_nested_containers_as_user();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_users()->add_values("bar");
+  }
+
+  {
+    // "ops" principal can launch nested containers any linux user.
+    mesos::ACL::LaunchNestedContainerAsUser* acl =
+        acls.add_launch_nested_containers_as_user();
+    acl->mutable_principals()->add_values("ops");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // No one else can launch nested containers as any user.
+    mesos::ACL::LaunchNestedContainerAsUser* acl =
+        acls.add_launch_nested_containers_as_user();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.set_user("user");
+
+  // Create ExecutorInfo with a user not mentioned in the ACLs in
+  // command as object to be authorized.
+  ExecutorInfo executorInfo;
+  {
+    executorInfo.set_name("Task");
+    executorInfo.mutable_executor_id()->set_value("t");
+    executorInfo.mutable_command()->set_value("echo hello");
+    executorInfo.mutable_command()->set_user("user");
+  }
+
+  // Create ExecutorInfo with user "bar" in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoBar;
+  {
+    executorInfoBar.set_name("Executor");
+    executorInfoBar.mutable_executor_id()->set_value("e");
+    executorInfoBar.mutable_command()->set_value("echo hello");
+    executorInfoBar.mutable_command()->set_user("bar");
+  }
+
+  // Create ExecutorInfo with no user in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoNoUser;
+  {
+    executorInfoNoUser.set_name("Executor");
+    executorInfoNoUser.mutable_executor_id()->set_value("s");
+    executorInfoNoUser.mutable_command()->set_value("echo hello");
+  }
+
+  // Create ExecutorInfo with no command as object to be authorized.
+  ExecutorInfo executorInfoNoCommand;
+  {
+    executorInfoNoCommand.set_name("Executor");
+    executorInfoNoCommand.mutable_executor_id()->set_value("t");
+  }
+
+  // Principal "foo" cannot launch a nested container with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" cannot launch a nested container with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can launch a nested container with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can launch a nested container with a request with the
+  // default `FrameworkInfo.user`.
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can launch a nested container with a request
+  // where the executor is running as a container, it falls back
+  // to the `FrameworkInfo.user`.
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoCommand);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" can launch a nested container as user "bar" under parent
+  // containers running as user "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Create CommandInfo with a user not mentioned in the ACLs in
+  // command as object to be authorized.
+  CommandInfo commandInfo;
+  {
+    commandInfo.set_value("echo hello");
+    commandInfo.set_user("user");
+  }
+
+  // Create CommandInfo with user "bar" in command as object to
+  // be authorized.
+  CommandInfo commandInfoBar;
+  {
+    commandInfoBar.set_value("echo hello");
+    commandInfoBar.set_user("bar");
+  }
+
+  // Create CommandInfo with no user in command as object to
+  // be authorized.
+  CommandInfo commandInfoNoUser;
+  {
+    commandInfoNoUser.set_value("echo hello");
+  }
+
+  // Principal "foo" cannot launch a session with a request with
+  // ExecutorInfo and CommandInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(commandInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" cannot launch a nested container with a request with
+  // CommandInfo running with user "user", even if the ExecutorInfo
+  // is running as "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(commandInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" can launch a nested container with a request with
+  // CommandInfo and ExecutorInfo running with user "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(commandInfoBar);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can launch nested containers with any combination of
+  // requests.
+  {
+    authorization::Request request;
+    request.set_action(authorization::LAUNCH_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_command_info()->MergeFrom(commandInfoBar);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(
+        commandInfoNoUser);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfo);
+    request.mutable_object()->mutable_command_info()->MergeFrom(
+        commandInfoBar);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_command_info()->MergeFrom(
+        commandInfoBar);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_command_info()->MergeFrom(
+        commandInfoNoUser);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+}
+
+
+// This tests the authorization of waiting for a nested container.
+TYPED_TEST(AuthorizationTest, WaitNestedContainer)
+{
+  // Setup ACLs.
+  ACLs acls;
+
+  {
+    // "foo" principal cannot wait of any nested container.
+    mesos::ACL::WaitNestedContainer* acl =
+        acls.add_wait_nested_containers();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "bar" principal can wait for nested containers running under
+    // user "bar".
+    mesos::ACL::WaitNestedContainer* acl =
+        acls.add_wait_nested_containers();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_users()->add_values("bar");
+  }
+
+  {
+    // "ops" principal can wait for all nested containers.
+    mesos::ACL::WaitNestedContainer* acl =
+        acls.add_wait_nested_containers();
+    acl->mutable_principals()->add_values("ops");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // No one else can wait for any nested container.
+    mesos::ACL::WaitNestedContainer* acl =
+        acls.add_wait_nested_containers();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.set_user("user");
+
+  // Create ExecutorInfo with a user not mentioned in the ACLs in
+  // command as object to be authorized.
+  ExecutorInfo executorInfo;
+  {
+    executorInfo.set_name("Task");
+    executorInfo.mutable_executor_id()->set_value("t");
+    executorInfo.mutable_command()->set_value("echo hello");
+    executorInfo.mutable_command()->set_user("user");
+  }
+
+  // Create ExecutorInfo with user "bar" in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoBar;
+  {
+    executorInfoBar.set_name("Executor");
+    executorInfoBar.mutable_executor_id()->set_value("e");
+    executorInfoBar.mutable_command()->set_value("echo hello");
+    executorInfoBar.mutable_command()->set_user("bar");
+  }
+
+  // Create ExecutorInfo with no user in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoNoUser;
+  {
+    executorInfoNoUser.set_name("Executor");
+    executorInfoNoUser.mutable_executor_id()->set_value("e");
+    executorInfoNoUser.mutable_command()->set_value("echo hello");
+  }
+
+  // Principal "foo" cannot wait for a nested container with a request
+  // with ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::WAIT_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" cannot wait for a nested container with a request
+  // with ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::WAIT_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can wait for a nested container with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::WAIT_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" can wait for a nested container with a request with
+  // ExecutorInfo running with user "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::WAIT_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can wait for a nested container with a request with
+  // an ExecutorInfo without user.
+  {
+    authorization::Request request;
+    request.set_action(authorization::WAIT_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+}
+
+
+// This tests the authorization of killing a nested container.
+TYPED_TEST(AuthorizationTest, KillNestedContainer)
+{
+  // Setup ACLs.
+  ACLs acls;
+
+  {
+    // "foo" principal cannot kill any nested container.
+    mesos::ACL::KillNestedContainer* acl =
+        acls.add_kill_nested_containers();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // "bar" principal can kill nested containers running under
+    // user "bar".
+    mesos::ACL::KillNestedContainer* acl =
+        acls.add_kill_nested_containers();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_users()->add_values("bar");
+  }
+
+  {
+    // "ops" principal can kill all nested containers.
+    mesos::ACL::KillNestedContainer* acl =
+        acls.add_kill_nested_containers();
+    acl->mutable_principals()->add_values("ops");
+    acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // No one else can kill any nested container.
+    mesos::ACL::KillNestedContainer* acl =
+        acls.add_kill_nested_containers();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.set_user("user");
+
+  // Create ExecutorInfo with a user not mentioned in the ACLs in
+  // command as object to be authorized.
+  ExecutorInfo executorInfo;
+  {
+    executorInfo.set_name("Task");
+    executorInfo.mutable_executor_id()->set_value("t");
+    executorInfo.mutable_command()->set_value("echo hello");
+    executorInfo.mutable_command()->set_user("user");
+  }
+
+  // Create ExecutorInfo with user "bar" in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoBar;
+  {
+    executorInfoBar.set_name("Executor");
+    executorInfoBar.mutable_executor_id()->set_value("e");
+    executorInfoBar.mutable_command()->set_value("echo hello");
+    executorInfoBar.mutable_command()->set_user("bar");
+  }
+
+  // Create ExecutorInfo with no user in command as object to
+  // be authorized.
+  ExecutorInfo executorInfoNoUser;
+  {
+    executorInfoNoUser.set_name("Executor");
+    executorInfoNoUser.mutable_executor_id()->set_value("e");
+    executorInfoNoUser.mutable_command()->set_value("echo hello");
+  }
+
+  // Principal "foo" cannot kill a nested container with a request
+  // with ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::KILL_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" cannot kill a nested container with a request
+  // with ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::KILL_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_FALSE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can kill a nested container with a request with
+  // ExecutorInfo running with user "user".
+  {
+    authorization::Request request;
+    request.set_action(authorization::KILL_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(executorInfo);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "bar" can kill a nested container with a request with
+  // ExecutorInfo running with user "bar".
+  {
+    authorization::Request request;
+    request.set_action(authorization::KILL_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoBar);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+
+  // Principal "ops" can kill a nested container with a request with
+  // an ExecutorInfo without user.
+  {
+    authorization::Request request;
+    request.set_action(authorization::KILL_NESTED_CONTAINER);
+    request.mutable_subject()->set_value("ops");
+    request.mutable_object()->mutable_executor_info()->MergeFrom(
+        executorInfoNoUser);
+    request.mutable_object()->mutable_framework_info()->MergeFrom(
+        frameworkInfo);
+
+    AWAIT_EXPECT_TRUE(authorizer.get()->authorized(request));
+  }
+}
+
+
 // This tests that a missing request.object is allowed for an ACL whose
 // Object is ANY.
 // NOTE: The only usecase for this behavior is currently teardownFramework.