You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2015/09/10 00:59:23 UTC

[3/3] mesos git commit: Added /reserve HTTP endpoint to the master.

Added /reserve HTTP endpoint to the master.

Review: https://reviews.apache.org/r/35702


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/57a7e7d0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/57a7e7d0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/57a7e7d0

Branch: refs/heads/master
Commit: 57a7e7d0283aa08455d6572ade75a11d914c6962
Parents: e758d24
Author: Michael Park <mp...@apache.org>
Authored: Wed Aug 5 00:04:03 2015 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Wed Sep 9 15:28:29 2015 -0700

----------------------------------------------------------------------
 src/master/http.cpp       | 126 +++++++++++++++++++++++++++++++++++++++++
 src/master/master.cpp     |  36 +++++++++---
 src/master/master.hpp     |  24 ++++++--
 src/master/validation.cpp |  14 ++---
 src/master/validation.hpp |   2 +-
 5 files changed, 181 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 94e97a2..bcf7f93 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -78,6 +78,7 @@ using process::USAGE;
 
 using process::http::Accepted;
 using process::http::BadRequest;
+using process::http::Conflict;
 using process::http::Forbidden;
 using process::http::InternalServerError;
 using process::http::MethodNotAllowed;
@@ -672,6 +673,131 @@ Future<Response> Master::Http::redirect(const Request& request) const
 }
 
 
+Future<Response> Master::Http::reserve(const Request& request) const
+{
+  if (request.method != "POST") {
+    return BadRequest("Expecting POST");
+  }
+
+  // Parse the query string in the request body.
+  Try<hashmap<string, string>> decode =
+    process::http::query::decode(request.body);
+
+  if (decode.isError()) {
+    return BadRequest("Unable to decode query string: " + decode.error());
+  }
+
+  const hashmap<string, string>& values = decode.get();
+
+  if (values.get("slaveId").isNone()) {
+    return BadRequest("Missing 'slaveId' query parameter");
+  }
+
+  SlaveID slaveId;
+  slaveId.set_value(values.get("slaveId").get());
+
+  Slave* slave = master->slaves.registered.get(slaveId);
+  if (slave == NULL) {
+    return BadRequest("No slave found with specified ID");
+  }
+
+  if (values.get("resources").isNone()) {
+    return BadRequest("Missing 'resources' query parameter");
+  }
+
+  Try<JSON::Array> parse =
+    JSON::parse<JSON::Array>(values.get("resources").get());
+
+  if (parse.isError()) {
+    return BadRequest(
+        "Error in parsing 'resources' query parameter: " + parse.error());
+  }
+
+  Resources resources;
+  foreach (const JSON::Value& value, parse.get().values) {
+    Try<Resource> resource = ::protobuf::parse<Resource>(value);
+    if (resource.isError()) {
+      return BadRequest(
+          "Error in parsing 'resources' query parameter: " + resource.error());
+    }
+    resources += resource.get();
+  }
+
+  Result<Credential> credential = authenticate(request);
+  if (credential.isError()) {
+    return Unauthorized("Mesos master", credential.error());
+  }
+
+  // Create an offer operation.
+  Offer::Operation operation;
+  operation.set_type(Offer::Operation::RESERVE);
+  operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
+
+  Option<string> principal =
+    credential.isSome() ? credential.get().principal() : Option<string>::none();
+
+  Option<Error> validate =
+    validation::operation::validate(operation.reserve(), None(), principal);
+
+  if (validate.isSome()) {
+    return BadRequest("Invalid RESERVE operation: " + validate.get().message);
+  }
+
+  // TODO(mpark): Add a reserve ACL for authorization.
+
+  // The resources recovered by rescinding outstanding offers.
+  Resources recovered;
+
+  // The unreserved resources needed to satisfy the RESERVE operation.
+  // This is used in an optimization where we try to only rescind
+  // offers that would contribute to satisfying the Reserve operation.
+  Resources remaining = resources.flatten();
+
+  // We pessimistically assume that what seems like "available"
+  // resources in the allocator will be gone. This can happen due to
+  // the race between the allocator scheduling an 'allocate' call to
+  // itself vs master's request to schedule 'updateAvailable'.
+  // We greedily rescind one offer at time until we've rescinded
+  // enough offers to cover for 'resources'.
+  foreach (Offer* offer, utils::copy(slave->offers)) {
+    // If rescinding the offer would not contribute to satisfying
+    // the remaining resources, skip it.
+    if (remaining == remaining - offer->resources()) {
+      continue;
+    }
+
+    recovered += offer->resources();
+    remaining -= offer->resources();
+
+    // We explicitly pass 'Filters()' which has a default 'refuse_sec'
+    // of 5 seconds rather than 'None()' here, so that we can
+    // virtually always win the race against 'allocate'.
+    master->allocator->recoverResources(
+        offer->framework_id(),
+        offer->slave_id(),
+        offer->resources(),
+        Filters());
+
+    master->removeOffer(offer, true); // Rescind!
+
+    // If we've rescinded enough offers to cover for 'resources',
+    // we're done.
+    Try<Resources> updatedRecovered = recovered.apply(operation);
+    if (updatedRecovered.isSome()) {
+      break;
+    }
+  }
+
+  // Propogate the 'Future<Nothing>' as 'Future<Response>' where
+  // 'Nothing' -> 'OK' and Failed -> 'Conflict'.
+  return master->apply(slave, operation)
+    .then([]() -> Response { return OK(); })
+    .repair([](const Future<Response>& result) {
+       return Conflict(result.failure());
+    });
+}
+
+
 const string Master::Http::SLAVES_HELP = HELP(
     TLDR(
         "Information about registered slaves."),

http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5589eca..ea7d613 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -780,6 +780,11 @@ void Master::initialize()
         [http](const process::http::Request& request) {
           return http.redirect(request);
         });
+  route("/reserve",
+        None(),  // TODO(mpark): Add an Http::RESERVE_HELP,
+        [http](const process::http::Request& request) {
+          return http.reserve(request);
+        });
   route("/roles.json",
         Http::ROLES_HELP,
         [http](const process::http::Request& request) {
@@ -2921,7 +2926,7 @@ void Master::_accept(
                   << operation.reserve().resources() << " from framework "
                   << *framework << " to slave " << *slave;
 
-        applyOfferOperation(framework, slave, operation);
+        apply(framework, slave, operation);
         break;
       }
 
@@ -2946,7 +2951,7 @@ void Master::_accept(
                   << operation.unreserve().resources() << " from framework "
                   << *framework << " to slave " << *slave;
 
-        applyOfferOperation(framework, slave, operation);
+        apply(framework, slave, operation);
         break;
       }
 
@@ -2972,7 +2977,7 @@ void Master::_accept(
                   << operation.create().volumes() << " from framework "
                   << *framework << " to slave " << *slave;
 
-        applyOfferOperation(framework, slave, operation);
+        apply(framework, slave, operation);
         break;
       }
 
@@ -2998,7 +3003,7 @@ void Master::_accept(
                   << operation.create().volumes() << " from framework "
                   << *framework << " to slave " << *slave;
 
-        applyOfferOperation(framework, slave, operation);
+        apply(framework, slave, operation);
         break;
       }
 
@@ -5721,7 +5726,7 @@ void Master::removeExecutor(
 }
 
 
-void Master::applyOfferOperation(
+void Master::apply(
     Framework* framework,
     Slave* slave,
     const Offer::Operation& operation)
@@ -5729,10 +5734,23 @@ void Master::applyOfferOperation(
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(slave);
 
-  allocator->updateAllocation(
-      framework->id(),
-      slave->id,
-      {operation});
+  allocator->updateAllocation(framework->id(), slave->id, {operation});
+
+  _apply(slave, operation);
+}
+
+
+Future<Nothing> Master::apply(Slave* slave, const Offer::Operation& operation)
+{
+  CHECK_NOTNULL(slave);
+
+  return allocator->updateAvailable(slave->id, {operation})
+    .onReady(defer(self(), &Master::_apply, slave, operation));
+}
+
+
+void Master::_apply(Slave* slave, const Offer::Operation& operation) {
+  CHECK_NOTNULL(slave);
 
   slave->apply(operation);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e133185..7849d68 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -672,14 +672,24 @@ protected:
       const FrameworkID& frameworkId,
       const ExecutorID& executorId);
 
-  // Updates slave's resources by applying the given operation. It
-  // also updates the allocator and sends a CheckpointResourcesMessage
-  // to the slave with slave's current checkpointed resources.
-  void applyOfferOperation(
+  // Updates the allocator and updates the slave's resources by
+  // applying the given operation. It also sends a
+  // 'CheckpointResourcesMessage' to the slave with the updated
+  // checkpointed resources.
+  void apply(
       Framework* framework,
       Slave* slave,
       const Offer::Operation& operation);
 
+  // Attempts to update the allocator by applying the given operation.
+  // If successful, updates the slave's resources, sends a
+  // 'CheckpointResourcesMessage' to the slave with the updated
+  // checkpointed resources, and returns a 'Future' with 'Nothing'.
+  // Otherwise, no action is taken and returns a failed 'Future'.
+  process::Future<Nothing> apply(
+      Slave* slave,
+      const Offer::Operation& operation);
+
   // Forwards the update to the framework.
   void forward(
       const StatusUpdate& update,
@@ -702,6 +712,8 @@ protected:
   Option<Credentials> credentials;
 
 private:
+  void _apply(Slave* slave, const Offer::Operation& operation);
+
   void drop(
       const process::UPID& from,
       const scheduler::Call& call,
@@ -810,6 +822,10 @@ private:
     process::Future<process::http::Response> redirect(
         const process::http::Request& request) const;
 
+    // /master/reserve
+    process::Future<process::http::Response> reserve(
+        const process::http::Request& request) const;
+
     // /master/roles.json
     process::Future<process::http::Response> roles(
         const process::http::Request& request) const;

http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index ffb7bf0..0361d1f 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -668,7 +668,7 @@ namespace operation {
 
 Option<Error> validate(
     const Offer::Operation::Reserve& reserve,
-    const string& role,
+    const Option<string>& role,
     const Option<string>& principal)
 {
   Option<Error> error = resource::validate(reserve.resources());
@@ -677,7 +677,7 @@ Option<Error> validate(
   }
 
   if (principal.isNone()) {
-    return Error("A framework without a principal cannot reserve resources.");
+    return Error("Cannot reserve resources without a principal.");
   }
 
   foreach (const Resource& resource, reserve.resources()) {
@@ -686,18 +686,18 @@ Option<Error> validate(
           "Resource " + stringify(resource) + " is not dynamically reserved");
     }
 
-    if (resource.role() != role) {
+    if (role.isSome() && resource.role() != role.get()) {
       return Error(
           "The reserved resource's role '" + resource.role() +
-          "' does not match the framework's role '" + role + "'");
+          "' does not match the framework's role '" + role.get() + "'");
     }
 
     if (resource.reservation().principal() != principal.get()) {
       return Error(
           "The reserved resource's principal '" +
-          stringify(resource.reservation().principal()) +
-          "' does not match the framework's principal '" +
-          stringify(principal.get()) + "'");
+          resource.reservation().principal() +
+          "' does not match the principal '" +
+          principal.get() + "'");
     }
 
     // NOTE: This check would be covered by 'contains' since there

http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index 43b8d84..3434868 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -104,7 +104,7 @@ namespace operation {
 // Validates the RESERVE operation.
 Option<Error> validate(
     const Offer::Operation::Reserve& reserve,
-    const std::string& role,
+    const Option<std::string>& role,
     const Option<std::string>& principal);