You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2017/11/15 08:13:31 UTC

[07/15] mesos git commit: Implemented Standalone Container API.

Implemented Standalone Container API.

The Standalone and Nested Container APIs are very similar.
This commit combines the two API implementations by adding a
translation function (i.e. `launchNestedContainer` and
`launchContainer`) which unpacks the V1 protobuf into fields
which can be passed into a common function (i.e. `_launchContainer`).

The common functions authorize based on the type of container being
launched and it is possible to use both Standalone and Nested
Container APIs interchangably for nested containers.

This approach is somewhat messy for for the `WAIT_(NESTED_)CONTAINER`
calls, as these methods require different return protobufs based on
the original call.

Review: https://reviews.apache.org/r/62145


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9086ecef
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9086ecef
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9086ecef

Branch: refs/heads/master
Commit: 9086ecef4070b126b106c3f03b5763ce6ca444b4
Parents: 2d7ec26
Author: Joseph Wu <jo...@apache.org>
Authored: Wed Sep 6 14:53:05 2017 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Tue Nov 14 17:16:13 2017 -0800

----------------------------------------------------------------------
 src/slave/http.cpp | 647 ++++++++++++++++++++++++++++++++----------------
 src/slave/http.hpp |  44 +++-
 2 files changed, 472 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9086ecef/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 22cdac9..e4fa4df 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -599,6 +599,18 @@ Future<Response> Http::_api(
 
     case mesos::agent::Call::ATTACH_CONTAINER_OUTPUT:
       return attachContainerOutput(call, mediaTypes, principal);
+
+    case mesos::agent::Call::LAUNCH_CONTAINER:
+      return launchContainer(call, mediaTypes.accept, principal);
+
+    case mesos::agent::Call::WAIT_CONTAINER:
+      return waitContainer(call, mediaTypes.accept, principal);
+
+    case mesos::agent::Call::KILL_CONTAINER:
+      return killContainer(call, mediaTypes.accept, principal);
+
+    case mesos::agent::Call::REMOVE_CONTAINER:
+      return removeContainer(call, mediaTypes.accept, principal);
   }
 
   UNREACHABLE();
@@ -2350,76 +2362,114 @@ Future<Response> Http::launchNestedContainer(
   LOG(INFO) << "Processing LAUNCH_NESTED_CONTAINER call for container '"
             << call.launch_nested_container().container_id() << "'";
 
-  Future<Owned<ObjectApprover>> approver;
+  Future<Owned<AuthorizationAcceptor>> authorizer =
+    AuthorizationAcceptor::create(
+        principal, slave->authorizer, authorization::LAUNCH_NESTED_CONTAINER);
 
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
+  return authorizer
+    .then(defer(
+        slave->self(),
+        [=](const Owned<AuthorizationAcceptor>& authorizer) {
+          return _launchContainer(
+              call.launch_nested_container().container_id(),
+              call.launch_nested_container().command(),
+              None(),
+              call.launch_nested_container().has_container()
+                ? call.launch_nested_container().container()
+                : Option<ContainerInfo>::none(),
+              ContainerClass::DEFAULT,
+              acceptType,
+              authorizer);
+        }));
+}
 
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::LAUNCH_NESTED_CONTAINER);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
 
-  return approver
-    .then(defer(slave->self(), [=](const Owned<ObjectApprover>& approver) {
-      return _launchNestedContainer(
-          call.launch_nested_container().container_id(),
-          call.launch_nested_container().command(),
-          call.launch_nested_container().has_container()
-            ? call.launch_nested_container().container()
-            : Option<ContainerInfo>::none(),
-          ContainerClass::DEFAULT,
-          acceptType,
-          approver);
-    }));
+Future<Response> Http::launchContainer(
+    const mesos::agent::Call& call,
+    ContentType acceptType,
+    const Option<Principal>& principal) const
+{
+  CHECK_EQ(mesos::agent::Call::LAUNCH_CONTAINER, call.type());
+  CHECK(call.has_launch_container());
+
+  Future<Owned<AuthorizationAcceptor>> authorizer =
+    AuthorizationAcceptor::create(
+        principal,
+        slave->authorizer,
+        call.launch_container().container_id().has_parent()
+          ? authorization::LAUNCH_NESTED_CONTAINER
+          : authorization::LAUNCH_STANDALONE_CONTAINER);
+
+  return authorizer
+    .then(defer(
+        slave->self(),
+        [=](const Owned<AuthorizationAcceptor>& authorizer)
+          -> Future<Response> {
+          return _launchContainer(
+              call.launch_container().container_id(),
+              call.launch_container().command(),
+              call.launch_container().resources(),
+              call.launch_container().has_container()
+                ? call.launch_container().container()
+                : Option<ContainerInfo>::none(),
+              ContainerClass::DEFAULT,
+              acceptType,
+              authorizer);
+        }));
 }
 
 
-Future<Response> Http::_launchNestedContainer(
+Future<Response> Http::_launchContainer(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
+    const Option<Resources>& resources,
     const Option<ContainerInfo>& containerInfo,
     const Option<ContainerClass>& containerClass,
     ContentType acceptType,
-    const Owned<ObjectApprover>& approver) const
+    const Owned<AuthorizationAcceptor>& authorizer) const
 {
+  Option<string> user;
+
+  // Attempt to get the executor associated with this ContainerID.
+  // We only expect to get the executor when launching a nested container
+  // under a container launched via a scheduler. In other cases, we are
+  // launching a standalone container (possibly nested).
   Executor* executor = slave->getExecutor(containerId);
   if (executor == nullptr) {
-    return NotFound("Container " + stringify(containerId) + " cannot be found");
-  }
-
-  Framework* framework = slave->getFramework(executor->frameworkId);
-  CHECK_NOTNULL(framework);
+    if (!authorizer->accept()) {
+      return Forbidden();
+    }
+  } else {
+    Framework* framework = slave->getFramework(executor->frameworkId);
+    CHECK_NOTNULL(framework);
 
-  Try<bool> approved = approver->approved(
-      ObjectApprover::Object(
-          executor->info,
-          framework->info,
-          commandInfo,
-          containerId));
+    if (!authorizer->accept(
+            executor->info, framework->info, commandInfo, containerId)) {
+      return Forbidden();
+    }
 
-  if (approved.isError()) {
-    return Failure(approved.error());
-  } else if (!approved.get()) {
-    return Forbidden();
+    // By default, we use the executor's user.
+    // The CommandInfo can override it, if specified.
+    user = executor->user;
   }
 
-  // By default, we use the executor's user.
-  // The command user overrides it if specified.
-  Option<string> user = executor->user;
+  ContainerConfig containerConfig;
+  containerConfig.mutable_command_info()->CopyFrom(commandInfo);
 
 #ifndef __WINDOWS__
-  if (commandInfo.has_user()) {
-    user = commandInfo.user();
-  }
-#endif
+  if (slave->flags.switch_user) {
+    if (commandInfo.has_user()) {
+      user = commandInfo.user();
+    }
 
-  ContainerConfig containerConfig;
-  containerConfig.mutable_command_info()->CopyFrom(commandInfo);
+    if (user.isSome()) {
+      containerConfig.set_user(user.get());
+    }
+  }
+#endif // __WINDOWS__
 
-  if (user.isSome()) {
-    containerConfig.set_user(user.get());
+  if (resources.isSome()) {
+    containerConfig.mutable_resources()->CopyFrom(resources.get());
   }
 
   if (containerInfo.isSome()) {
@@ -2430,6 +2480,38 @@ Future<Response> Http::_launchNestedContainer(
     containerConfig.set_container_class(containerClass.get());
   }
 
+  // For standalone top-level containers, supply a sandbox directory.
+  if (!containerId.has_parent()) {
+    const string directory =
+      slave::paths::getContainerPath(slave->flags.work_dir, containerId);
+
+    // NOTE: The below partially mirrors logic executed before the agent calls
+    // `containerizer->launch`. See `slave::paths::createExecutorDirectory`.
+    Try<Nothing> mkdir = os::mkdir(directory);
+    if (mkdir.isError()) {
+      return InternalServerError(
+          "Failed to create sandbox directory: " + mkdir.error());
+    }
+
+// `os::chown()` is not available on Windows.
+#ifndef __WINDOWS__
+    if (containerConfig.has_user()) {
+      Try<Nothing> chown = os::chown(containerConfig.user(), directory);
+      if (chown.isError()) {
+        // Attempt to clean up, but since we've already failed to chown,
+        // we don't check the return value here.
+        os::rmdir(directory);
+
+        return InternalServerError(
+            "Failed to chown sandbox directory '" +
+            directory + "':" + chown.error());
+      }
+    }
+#endif // __WINDOWS__
+
+    containerConfig.set_directory(directory);
+  }
+
   Future<bool> launched = slave->containerizer->launch(
       containerId,
       containerConfig,
@@ -2450,7 +2532,7 @@ Future<Response> Http::_launchNestedContainer(
         return;
       }
 
-      LOG(WARNING) << "Failed to launch nested container "
+      LOG(WARNING) << "Failed to launch container "
                    << containerId << ": "
                    << (launch.isFailed() ? launch.failure() : "discarded");
 
@@ -2460,7 +2542,7 @@ Future<Response> Http::_launchNestedContainer(
             return;
           }
 
-          LOG(ERROR) << "Failed to destroy nested container "
+          LOG(ERROR) << "Failed to destroy container "
                      << containerId << " after launch failure: "
                      << (destroy.isFailed() ? destroy.failure() : "discarded");
         });
@@ -2487,86 +2569,147 @@ Future<Response> Http::waitNestedContainer(
   LOG(INFO) << "Processing WAIT_NESTED_CONTAINER call for container '"
             << call.wait_nested_container().container_id() << "'";
 
-  Future<Owned<ObjectApprover>> approver;
+  Future<Owned<AuthorizationAcceptor>> authorizer =
+    AuthorizationAcceptor::create(
+        principal, slave->authorizer, authorization::WAIT_NESTED_CONTAINER);
 
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
+  return authorizer
+    .then(defer(
+        slave->self(),
+        [=](const Owned<AuthorizationAcceptor>& authorizer) {
+          return _waitContainer(
+              call.wait_nested_container().container_id(),
+              acceptType,
+              authorizer,
+              true);
+        }));
+}
 
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::WAIT_NESTED_CONTAINER);
+
+Future<Response> Http::waitContainer(
+    const mesos::agent::Call& call,
+    ContentType acceptType,
+    const Option<Principal>& principal) const
+{
+  CHECK_EQ(mesos::agent::Call::WAIT_CONTAINER, call.type());
+  CHECK(call.has_wait_container());
+
+  Future<Owned<AuthorizationAcceptor>> authorizer =
+    AuthorizationAcceptor::create(
+        principal,
+        slave->authorizer,
+        call.wait_container().container_id().has_parent()
+          ? authorization::WAIT_NESTED_CONTAINER
+          : authorization::WAIT_STANDALONE_CONTAINER);
+
+  return authorizer
+    .then(defer(
+        slave->self(),
+        [=](const Owned<AuthorizationAcceptor>& authorizer) {
+          return _waitContainer(
+              call.wait_container().container_id(),
+              acceptType,
+              authorizer,
+              false);
+        }));
+}
+
+
+Future<Response> Http::_waitContainer(
+    const ContainerID& containerId,
+    ContentType acceptType,
+    const Owned<AuthorizationAcceptor>& authorizer,
+    const bool deprecated) const
+{
+  // Attempt to get the executor associated with this ContainerID.
+  // We only expect to get the executor when waiting upon a nested container
+  // under a container launched via a scheduler. In other cases, we are
+  // waiting on a standalone container (possibly nested).
+  Executor* executor = slave->getExecutor(containerId);
+  if (executor == nullptr) {
+    if (!authorizer->accept()) {
+      return Forbidden();
+    }
   } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
+    Framework* framework = slave->getFramework(executor->frameworkId);
+    CHECK_NOTNULL(framework);
+
+    if (!authorizer->accept(
+            executor->info,
+            framework->info,
+            containerId)) {
+      return Forbidden();
+    }
   }
 
-  return approver.then(defer(slave->self(),
-    [this, call, acceptType](const Owned<ObjectApprover>& waitApprover)
-        -> Future<Response> {
-      const ContainerID& containerId =
-        call.wait_nested_container().container_id();
-
-      Executor* executor = slave->getExecutor(containerId);
-      if (executor == nullptr) {
+  return slave->containerizer->wait(containerId)
+    .then([=](const Option<ContainerTermination>& termination) -> Response {
+      if (termination.isNone()) {
         return NotFound(
             "Container " + stringify(containerId) + " cannot be found");
       }
 
-      Framework* framework = slave->getFramework(executor->frameworkId);
-      CHECK_NOTNULL(framework);
+      mesos::agent::Response response;
 
-      Try<bool> approved = waitApprover->approved(
-          ObjectApprover::Object(
-              executor->info,
-              framework->info,
-              containerId));
+      // The response object depends on which API was originally used
+      // to make this call.
+      if (deprecated) {
+        response.set_type(mesos::agent::Response::WAIT_NESTED_CONTAINER);
 
-      if (approved.isError()) {
-        return Failure(approved.error());
-      } else if (!approved.get()) {
-        return Forbidden();
-      }
+        mesos::agent::Response::WaitNestedContainer* waitNestedContainer =
+          response.mutable_wait_nested_container();
 
-      Future<Option<mesos::slave::ContainerTermination>> wait =
-        slave->containerizer->wait(containerId);
+        if (termination->has_status()) {
+          waitNestedContainer->set_exit_status(termination->status());
+        }
 
-      return wait
-        .then([containerId, acceptType](
-            const Option<ContainerTermination>& termination) -> Response {
-          if (termination.isNone()) {
-            return NotFound("Container " + stringify(containerId) +
-                            " cannot be found");
-          }
+        if (termination->has_state()) {
+          waitNestedContainer->set_state(termination->state());
+        }
 
-          mesos::agent::Response response;
-          response.set_type(mesos::agent::Response::WAIT_NESTED_CONTAINER);
+        if (termination->has_reason()) {
+          waitNestedContainer->set_reason(termination->reason());
+        }
 
-          mesos::agent::Response::WaitNestedContainer* waitNestedContainer =
-            response.mutable_wait_nested_container();
+        if (!termination->limited_resources().empty()) {
+          waitNestedContainer->mutable_limitation()->mutable_resources()
+            ->CopyFrom(termination->limited_resources());
+        }
 
-          if (termination->has_status()) {
-            waitNestedContainer->set_exit_status(termination->status());
-          }
+        if (termination->has_message()) {
+          waitNestedContainer->set_message(termination->message());
+        }
+      } else {
+        response.set_type(mesos::agent::Response::WAIT_CONTAINER);
 
-          if (termination->has_state()) {
-            waitNestedContainer->set_state(termination->state());
-          }
+        mesos::agent::Response::WaitContainer* waitContainer =
+          response.mutable_wait_container();
 
-          if (termination->has_reason()) {
-            waitNestedContainer->set_reason(termination->reason());
-          }
+        if (termination->has_status()) {
+          waitContainer->set_exit_status(termination->status());
+        }
 
-          if (!termination->limited_resources().empty()) {
-            waitNestedContainer->mutable_limitation()->mutable_resources()
-              ->CopyFrom(termination->limited_resources());
-          }
+        if (termination->has_state()) {
+          waitContainer->set_state(termination->state());
+        }
 
-          if (termination->has_message()) {
-            waitNestedContainer->set_message(termination->message());
-          }
+        if (termination->has_reason()) {
+          waitContainer->set_reason(termination->reason());
+        }
 
-          return OK(serialize(acceptType, evolve(response)),
-                    stringify(acceptType));
-        });
-    }));
+        if (!termination->limited_resources().empty()) {
+          waitContainer->mutable_limitation()->mutable_resources()
+            ->CopyFrom(termination->limited_resources());
+        }
+
+        if (termination->has_message()) {
+          waitContainer->set_message(termination->message());
+        }
+      }
+
+      return OK(serialize(acceptType, evolve(response)),
+                stringify(acceptType));
+    });
 }
 
 
@@ -2581,61 +2724,101 @@ Future<Response> Http::killNestedContainer(
   LOG(INFO) << "Processing KILL_NESTED_CONTAINER call for container '"
             << call.kill_nested_container().container_id() << "'";
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
+  Future<Owned<AuthorizationAcceptor>> authorizer =
+    AuthorizationAcceptor::create(
+        principal, slave->authorizer, authorization::KILL_NESTED_CONTAINER);
 
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::KILL_NESTED_CONTAINER);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
+  // SIGKILL is used by default if a signal is not specified.
+  int signal = SIGKILL;
+  if (call.kill_nested_container().has_signal()) {
+    signal = call.kill_nested_container().signal();
   }
 
-  return approver.then(defer(slave->self(),
-    [this, call](const Owned<ObjectApprover>& killApprover)
-        -> Future<Response> {
-      const ContainerID& containerId =
-        call.kill_nested_container().container_id();
+  return authorizer
+    .then(defer(
+        slave->self(),
+        [=](const Owned<AuthorizationAcceptor>& authorizer) {
+          return _killContainer(
+              call.kill_nested_container().container_id(),
+              signal,
+              acceptType,
+              authorizer);
+        }));
+}
 
-      // SIGKILL is used by default if a signal is not specified.
-      int signal = SIGKILL;
-      if (call.kill_nested_container().has_signal()) {
-        signal = call.kill_nested_container().signal();
-      }
 
-      Executor* executor = slave->getExecutor(containerId);
-      if (executor == nullptr) {
-        return NotFound(
-            "Container " + stringify(containerId) + " cannot be found");
-      }
+Future<Response> Http::killContainer(
+    const mesos::agent::Call& call,
+    ContentType acceptType,
+    const Option<Principal>& principal) const
+{
+  CHECK_EQ(mesos::agent::Call::KILL_CONTAINER, call.type());
+  CHECK(call.has_kill_container());
 
-      Framework* framework = slave->getFramework(executor->frameworkId);
-      CHECK_NOTNULL(framework);
+  Future<Owned<AuthorizationAcceptor>> authorizer =
+    AuthorizationAcceptor::create(
+        principal,
+        slave->authorizer,
+        call.kill_container().container_id().has_parent()
+          ? authorization::KILL_NESTED_CONTAINER
+          : authorization::KILL_STANDALONE_CONTAINER);
 
-      Try<bool> approved = killApprover->approved(
-          ObjectApprover::Object(
-              executor->info,
-              framework->info,
-              containerId));
+  // SIGKILL is used by default if a signal is not specified.
+  int signal = SIGKILL;
+  if (call.kill_container().has_signal()) {
+    signal = call.kill_container().signal();
+  }
 
-      if (approved.isError()) {
-        return Failure(approved.error());
-      } else if (!approved.get()) {
-        return Forbidden();
-      }
+  return authorizer
+    .then(defer(
+        slave->self(),
+        [=](const Owned<AuthorizationAcceptor>& authorizer) {
+          return _killContainer(
+              call.kill_container().container_id(),
+              signal,
+              acceptType,
+              authorizer);
+        }));
+}
 
-      Future<bool> kill = slave->containerizer->kill(containerId, signal);
 
-      return kill
-        .then([containerId](bool found) -> Response {
-          if (!found) {
-            return NotFound("Container '" + stringify(containerId) + "'"
-                            " cannot be found (or is already killed)");
-          }
-          return OK();
-        });
-    }));
+Future<Response> Http::_killContainer(
+    const ContainerID& containerId,
+    const int signal,
+    ContentType acceptType,
+    const Owned<AuthorizationAcceptor>& authorizer) const
+{
+  // Attempt to get the executor associated with this ContainerID.
+  // We only expect to get the executor when killing a nested container
+  // under a container launched via a scheduler. In other cases, we are
+  // killing a standalone container (possibly nested).
+  Executor* executor = slave->getExecutor(containerId);
+  if (executor == nullptr) {
+    if (!authorizer->accept()) {
+      return Forbidden();
+    }
+  } else {
+    Framework* framework = slave->getFramework(executor->frameworkId);
+    CHECK_NOTNULL(framework);
+
+    if (!authorizer->accept(
+            executor->info,
+            framework->info,
+            containerId)) {
+      return Forbidden();
+    }
+  }
+
+  Future<bool> kill = slave->containerizer->kill(containerId, signal);
+
+  return kill
+    .then([containerId](bool found) -> Response {
+      if (!found) {
+        return NotFound("Container '" + stringify(containerId) + "'"
+                        " cannot be found (or is already killed)");
+      }
+      return OK();
+    });
 }
 
 
@@ -2650,56 +2833,88 @@ Future<Response> Http::removeNestedContainer(
   LOG(INFO) << "Processing REMOVE_NESTED_CONTAINER call for container '"
             << call.remove_nested_container().container_id() << "'";
 
-  Future<Owned<ObjectApprover>> approver;
+  Future<Owned<AuthorizationAcceptor>> authorizer =
+    AuthorizationAcceptor::create(
+        principal, slave->authorizer, authorization::REMOVE_NESTED_CONTAINER);
 
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
+  return authorizer
+    .then(defer(
+        slave->self(),
+        [=](const Owned<AuthorizationAcceptor>& authorizer) {
+          return _removeContainer(
+              call.remove_nested_container().container_id(),
+              acceptType,
+              authorizer);
+        }));
+}
 
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::REMOVE_NESTED_CONTAINER);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
 
-  return approver.then(defer(slave->self(),
-    [this, call](const Owned<ObjectApprover>& removeApprover)
-        -> Future<Response> {
-      const ContainerID& containerId =
-        call.remove_nested_container().container_id();
+Future<Response> Http::removeContainer(
+    const mesos::agent::Call& call,
+    ContentType acceptType,
+    const Option<Principal>& principal) const
+{
+  CHECK_EQ(mesos::agent::Call::REMOVE_CONTAINER, call.type());
+  CHECK(call.has_remove_container());
 
-      Executor* executor = slave->getExecutor(containerId);
-      if (executor == nullptr) {
-        return OK();
-      }
+  Future<Owned<AuthorizationAcceptor>> authorizer =
+    AuthorizationAcceptor::create(
+        principal,
+        slave->authorizer,
+        call.remove_container().container_id().has_parent()
+          ? authorization::REMOVE_NESTED_CONTAINER
+          : authorization::REMOVE_STANDALONE_CONTAINER);
 
-      Framework* framework = slave->getFramework(executor->frameworkId);
-      CHECK_NOTNULL(framework);
+  return authorizer
+    .then(defer(
+        slave->self(),
+        [=](const Owned<AuthorizationAcceptor>& authorizer) {
+          return _removeContainer(
+              call.remove_container().container_id(),
+              acceptType,
+              authorizer);
+        }));
+}
 
-      Try<bool> approved = removeApprover->approved(
-          ObjectApprover::Object(
-              executor->info,
-              framework->info,
-              containerId));
 
-      if (approved.isError()) {
-        return Failure(approved.error());
-      } else if (!approved.get()) {
-        return Forbidden();
-      }
+Future<Response> Http::_removeContainer(
+    const ContainerID& containerId,
+    ContentType acceptType,
+    const Owned<AuthorizationAcceptor>& authorizer) const
+{
+  // Attempt to get the executor associated with this ContainerID.
+  // We only expect to get the executor when removing a nested container
+  // under a container launched via a scheduler. In other cases, we are
+  // removing a standalone container (possibly nested).
+  Executor* executor = slave->getExecutor(containerId);
+  if (executor == nullptr) {
+    if (!authorizer->accept()) {
+      return Forbidden();
+    }
+  } else {
+    Framework* framework = slave->getFramework(executor->frameworkId);
+    CHECK_NOTNULL(framework);
+
+    if (!authorizer->accept(
+            executor->info,
+            framework->info,
+            containerId)) {
+      return Forbidden();
+    }
+  }
 
-      Future<Nothing> remove = slave->containerizer->remove(containerId);
+  Future<Nothing> remove = slave->containerizer->remove(containerId);
 
-      return remove.then(
-          [containerId](const Future<Nothing>& result) -> Response {
-            if (result.isFailed()) {
-              LOG(ERROR) << "Failed to remove nested container " << containerId
-                         << ": " << result.failure();
-              return InternalServerError(result.failure());
-            }
+  return remove
+    .then([=](const Future<Nothing>& result) -> Response {
+      if (result.isFailed()) {
+        LOG(ERROR) << "Failed to remove container " << containerId
+                   << ": " << result.failure();
+        return InternalServerError(result.failure());
+      }
 
-            return OK();
-          });
-    }));
+      return OK();
+    });
 }
 
 
@@ -2872,29 +3087,27 @@ Future<Response> Http::launchNestedContainerSession(
   LOG(INFO) << "Processing LAUNCH_NESTED_CONTAINER_SESSION call for container '"
             << call.launch_nested_container_session().container_id() << "'";
 
-  Future<Owned<ObjectApprover>> approver;
-
-  if (slave->authorizer.isSome()) {
-    Option<authorization::Subject> subject = createSubject(principal);
-
-    approver = slave->authorizer.get()->getObjectApprover(
-        subject, authorization::LAUNCH_NESTED_CONTAINER_SESSION);
-  } else {
-    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
-  }
+  Future<Owned<AuthorizationAcceptor>> authorizer =
+    AuthorizationAcceptor::create(
+        principal,
+        slave->authorizer,
+        authorization::LAUNCH_NESTED_CONTAINER_SESSION);
 
-  Future<Response> response = approver
-    .then(defer(slave->self(), [=](const Owned<ObjectApprover>& approver) {
-      return _launchNestedContainer(
-          call.launch_nested_container_session().container_id(),
-          call.launch_nested_container_session().command(),
-          call.launch_nested_container_session().has_container()
-            ? call.launch_nested_container_session().container()
-            : Option<ContainerInfo>::none(),
-          ContainerClass::DEBUG,
-          mediaTypes.accept,
-          approver);
-    }));
+  Future<Response> response = authorizer
+    .then(defer(
+        slave->self(),
+        [=](const Owned<AuthorizationAcceptor>& authorizer) {
+          return _launchContainer(
+              call.launch_nested_container_session().container_id(),
+              call.launch_nested_container_session().command(),
+              None(),
+              call.launch_nested_container_session().has_container()
+                ? call.launch_nested_container_session().container()
+                : Option<ContainerInfo>::none(),
+              ContainerClass::DEBUG,
+              mediaTypes.accept,
+              authorizer);
+        }));
 
   // Helper to destroy the container.
   auto destroy = [this](const ContainerID& containerId) {
@@ -2906,7 +3119,7 @@ Future<Response> Http::launchNestedContainerSession(
   };
 
   // If `response` has failed or is not `OK`, the container will be
-  // destroyed by `_launchNestedContainer`.
+  // destroyed by `_launchContainer`.
   return response
     .then(defer(slave->self(),
                 [=](const Response& response) -> Future<Response> {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9086ecef/src/slave/http.hpp
----------------------------------------------------------------------
diff --git a/src/slave/http.hpp b/src/slave/http.hpp
index 44a95de..a51831c 100644
--- a/src/slave/http.hpp
+++ b/src/slave/http.hpp
@@ -28,6 +28,8 @@
 
 #include <mesos/authorizer/authorizer.hpp>
 
+#include "common/http.hpp"
+
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -213,29 +215,67 @@ private:
       ContentType acceptType,
       const Option<process::http::authentication::Principal>& principal) const;
 
-  process::Future<process::http::Response> _launchNestedContainer(
+  process::Future<process::http::Response> launchContainer(
+      const mesos::agent::Call& call,
+      ContentType acceptType,
+      const Option<process::http::authentication::Principal>& principal) const;
+
+  process::Future<process::http::Response> _launchContainer(
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
+      const Option<Resources>& resources,
       const Option<ContainerInfo>& containerInfo,
       const Option<mesos::slave::ContainerClass>& containerClass,
       ContentType acceptType,
-      const process::Owned<ObjectApprover>& approver) const;
+      const process::Owned<AuthorizationAcceptor>& authorizer) const;
 
   process::Future<process::http::Response> waitNestedContainer(
       const mesos::agent::Call& call,
       ContentType acceptType,
       const Option<process::http::authentication::Principal>& principal) const;
 
+  process::Future<process::http::Response> waitContainer(
+      const mesos::agent::Call& call,
+      ContentType acceptType,
+      const Option<process::http::authentication::Principal>& principal) const;
+
+  process::Future<process::http::Response> _waitContainer(
+      const ContainerID& containerId,
+      ContentType acceptType,
+      const process::Owned<AuthorizationAcceptor>& authorizer,
+      const bool deprecated) const;
+
   process::Future<process::http::Response> killNestedContainer(
       const mesos::agent::Call& call,
       ContentType acceptType,
       const Option<process::http::authentication::Principal>& principal) const;
 
+  process::Future<process::http::Response> killContainer(
+      const mesos::agent::Call& call,
+      ContentType acceptType,
+      const Option<process::http::authentication::Principal>& principal) const;
+
+  process::Future<process::http::Response> _killContainer(
+      const ContainerID& containerId,
+      const int signal,
+      ContentType acceptType,
+      const process::Owned<AuthorizationAcceptor>& authorizer) const;
+
   process::Future<process::http::Response> removeNestedContainer(
       const mesos::agent::Call& call,
       ContentType acceptType,
       const Option<process::http::authentication::Principal>& principal) const;
 
+  process::Future<process::http::Response> removeContainer(
+      const mesos::agent::Call& call,
+      ContentType acceptType,
+      const Option<process::http::authentication::Principal>& principal) const;
+
+  process::Future<process::http::Response> _removeContainer(
+      const ContainerID& containerId,
+      ContentType acceptType,
+      const process::Owned<AuthorizationAcceptor>& authorizer) const;
+
   process::Future<process::http::Response> launchNestedContainerSession(
       const mesos::agent::Call& call,
       const RequestMediaTypes& mediaTypes,