You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by me...@apache.org on 2016/03/09 08:26:12 UTC

[4/4] mesos git commit: Introduced HTTP endpoint /weights for updating weight.

Introduced HTTP endpoint /weights for updating weight.

Review: https://reviews.apache.org/r/41681/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0fc4fd6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0fc4fd6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0fc4fd6

Branch: refs/heads/master
Commit: d0fc4fd6510f70d540caccbac3014984f08696ef
Parents: 7372385
Author: Yongqiao Wang <yq...@cn.ibm.com>
Authored: Tue Mar 8 14:57:22 2016 -0800
Committer: Adam B <ad...@mesosphere.io>
Committed: Tue Mar 8 23:18:41 2016 -0800

----------------------------------------------------------------------
 include/mesos/authorizer/authorizer.hpp   |  14 +++
 include/mesos/authorizer/authorizer.proto |  10 ++
 src/CMakeLists.txt                        |   1 +
 src/Makefile.am                           |   1 +
 src/authorizer/local/authorizer.cpp       |  29 +++++
 src/authorizer/local/authorizer.hpp       |   2 +
 src/master/http.cpp                       |  30 +++++
 src/master/master.cpp                     |  64 ++++++++++
 src/master/master.hpp                     |  94 ++++++++++++++-
 src/master/registry.proto                 |   8 ++
 src/master/weights_handler.cpp            | 160 +++++++++++++++++++++++++
 src/tests/mesos.cpp                       |   3 +
 src/tests/mesos.hpp                       |   2 +
 13 files changed, 417 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/include/mesos/authorizer/authorizer.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/authorizer.hpp b/include/mesos/authorizer/authorizer.hpp
index ec6c992..7054826 100644
--- a/include/mesos/authorizer/authorizer.hpp
+++ b/include/mesos/authorizer/authorizer.hpp
@@ -201,6 +201,20 @@ public:
   virtual process::Future<bool> authorize(
       const ACL::RemoveQuota& request) = 0;
 
+  /**
+   * Verifies whether a principal can update the weight for the specific roles.
+   *
+   * @param request `ACL::UpdateWeights` packing all the parameters needed to
+   *     verify if the given principal is allowed to update the weight of the
+   *     specified roles.
+   *
+   * @return true if the principal is allowed to update the weight for every
+   *     specified role, false otherwise. A failed future indicates a problem
+   *     processing the request; the request can be retried.
+   */
+  virtual process::Future<bool> authorize(
+      const ACL::UpdateWeights& request) = 0;
+
 protected:
   Authorizer() {}
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/include/mesos/authorizer/authorizer.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/authorizer.proto b/include/mesos/authorizer/authorizer.proto
index 723da93..d65377c 100644
--- a/include/mesos/authorizer/authorizer.proto
+++ b/include/mesos/authorizer/authorizer.proto
@@ -135,6 +135,15 @@ message ACL {
     // Objects: Principal of the entity that set the quota.
     required Entity quota_principals = 2;
   }
+
+  // Which principals are authorized to update weights for the given roles.
+  message UpdateWeights {
+    // Subjects: Operator username.
+    required Entity principals = 1;
+
+    // Objects: The list of roles whose weights can be updated.
+    optional Entity roles = 2;
+  }
 }
 
 
@@ -171,4 +180,5 @@ message ACLs {
   repeated ACL.SetQuota set_quotas = 9;
   repeated ACL.RemoveQuota remove_quotas = 10;
   repeated ACL.TeardownFramework teardown_frameworks = 11;
+  repeated ACL.UpdateWeights update_weights = 12;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8f57a57..b78d0f4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -208,6 +208,7 @@ set(MASTER_SRC
   master/registrar.cpp
   master/repairer.cpp
   master/validation.cpp
+  master/weights_handler.cpp
   master/allocator/allocator.cpp
   master/allocator/mesos/hierarchical.cpp
   master/allocator/mesos/metrics.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a41e95d..b449343 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -627,6 +627,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   master/registrar.cpp							\
   master/repairer.cpp							\
   master/validation.cpp							\
+  master/weights_handler.cpp						\
   master/allocator/allocator.cpp					\
   master/allocator/mesos/hierarchical.cpp				\
   master/allocator/mesos/metrics.cpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/authorizer/local/authorizer.cpp
----------------------------------------------------------------------
diff --git a/src/authorizer/local/authorizer.cpp b/src/authorizer/local/authorizer.cpp
index 4e5c2c2..15c857d 100644
--- a/src/authorizer/local/authorizer.cpp
+++ b/src/authorizer/local/authorizer.cpp
@@ -206,6 +206,21 @@ public:
     return acls.permissive(); // None of the ACLs match.
   }
 
+  Future<bool> authorize(const ACL::UpdateWeights& request)
+  {
+    foreach (const ACL::UpdateWeights& acl, acls.update_weights()) {
+      // ACL matches if both subjects and objects match.
+      if (matches(request.principals(), acl.principals()) &&
+          matches(request.roles(), acl.roles())) {
+        // ACL is allowed if both subjects and objects are allowed.
+        return allows(request.principals(), acl.principals()) &&
+               allows(request.roles(), acl.roles());
+      }
+    }
+
+    return acls.permissive(); // None of the ACLs match.
+  }
+
 private:
   // Match matrix:
   //
@@ -494,5 +509,19 @@ Future<bool> LocalAuthorizer::authorize(const ACL::RemoveQuota& request)
       process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
 }
 
+
+Future<bool> LocalAuthorizer::authorize(const ACL::UpdateWeights& request)
+{
+  if (process == NULL) {
+    return Failure("Authorizer not initialized");
+  }
+
+  // Necessary to disambiguate.
+  typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::UpdateWeights&);
+
+  return dispatch(
+      process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/authorizer/local/authorizer.hpp
----------------------------------------------------------------------
diff --git a/src/authorizer/local/authorizer.hpp b/src/authorizer/local/authorizer.hpp
index 96baf77..c87a991 100644
--- a/src/authorizer/local/authorizer.hpp
+++ b/src/authorizer/local/authorizer.hpp
@@ -69,6 +69,8 @@ public:
       const ACL::SetQuota& request);
   virtual process::Future<bool> authorize(
       const ACL::RemoveQuota& request);
+  virtual process::Future<bool> authorize(
+      const ACL::UpdateWeights& request);
 
 private:
   LocalAuthorizer();

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index a3ad57a..7304bfd 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1157,6 +1157,36 @@ Future<Response> Master::Http::quota(
 }
 
 
+string Master::Http::WEIGHTS_HELP()
+{
+  return HELP(
+    TLDR(
+        "Updates weights for the specified roles."),
+    DESCRIPTION(
+        "PUT: Validates the request body as JSON",
+        " and updates the weights for the specified roles."));
+}
+
+
+Future<Response> Master::Http::weights(
+    const Request& request,
+    const Option<string>& principal) const
+{
+  // Dispatch based on HTTP method to separate `WeightsHandler`.
+  if (request.method == "PUT") {
+    return weightsHandler.update(request, principal);
+  }
+
+  // TODO(Yongqiao Wang): Like /quota, we should also add query logic
+  // for /weights to keep consistent. Then /roles no longer needs to
+  // show weight information.
+
+  return MethodNotAllowed(
+      {"PUT"},
+      "Expecting 'PUT', received '" + request.method + "'");
+}
+
+
 string Master::Http::STATE_HELP()
 {
   return HELP(

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 57ff4a3..a952c0a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -981,6 +981,14 @@ void Master::initialize()
           Http::log(request);
           return http.quota(request, principal);
         });
+  route("/weights",
+        DEFAULT_HTTP_AUTHENTICATION_REALM,
+        Http::WEIGHTS_HELP(),
+        [this](const process::http::Request& request,
+               const Option<string>& principal) {
+          Http::log(request);
+          return http.weights(request, principal);
+        });
 
   // Provide HTTP assets from a "webui" directory. This is either
   // specified via flags (which is necessary for running out of the
@@ -1517,6 +1525,62 @@ Future<Nothing> Master::_recover(const Registry& registry)
   // satisfiable given all recovering agents reregister. We may want
   // to notify operators early if total quota cannot be met.
 
+  if (registry.weights_size() != 0) {
+    vector<WeightInfo> weightInfos;
+    hashmap<std::string, double> registry_weights;
+
+    // Save the weights.
+    foreach (const Registry::Weight& weight, registry.weights()) {
+      registry_weights[weight.info().role()] = weight.info().weight();
+      WeightInfo weightInfo;
+      weightInfo.set_role(weight.info().role());
+      weightInfo.set_weight(weight.info().weight());
+      weightInfos.push_back(weightInfo);
+    }
+
+    // TODO(Yongqiao Wang): After the Mesos master quorum is achieved,
+    // operator can send an update weights request to do a batch configuration
+    // for weights, so the `--weights` flag can be deprecated and this check
+    // can eventually be removed.
+    if (!weights.empty()) {
+      LOG(WARNING) << "Ignoring the --weights flag '" << flags.weights.get()
+                   << "', and recovering the weights from registry.";
+
+      // Before recovering weights from the registry, the allocator was already
+      // initialized with `--weights`, so here we need to reset (to 1.0)
+      // weights in the allocator that are not overridden by the registry.
+      foreachkey (const std::string& role, weights) {
+        if (!registry_weights.contains(role)) {
+          WeightInfo weightInfo;
+          weightInfo.set_role(role);
+          weightInfo.set_weight(1.0);
+          weightInfos.push_back(weightInfo);
+        }
+      }
+      // Clear weights specified by `--weights` flag.
+      weights.clear();
+    }
+
+    // Recover `weights` with `registry_weights`.
+    weights = registry_weights;
+
+    // Update allocator.
+    allocator->updateWeights(weightInfos);
+  } else if (!weights.empty()) {
+    // The allocator was already updated with the `--weights` flag values
+    // on startup.
+    // Initialize the registry with `--weights` flag when bootstrapping
+    // the cluster.
+    vector<WeightInfo> weightInfos;
+    foreachpair (const std::string& role, double weight, weights) {
+      WeightInfo weightInfo;
+      weightInfo.set_role(role);
+      weightInfo.set_weight(weight);
+      weightInfos.push_back(weightInfo);
+    }
+    registrar->apply(Owned<Operation>(new UpdateWeights(weightInfos)));
+  }
+
   // Recovery is now complete!
   LOG(INFO) << "Recovered " << registry.slaves().slaves().size() << " slaves"
             << " from the Registry (" << Bytes(registry.ByteSize()) << ")"

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index ea26670..a1b4f03 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1017,12 +1017,44 @@ private:
     Master* master;
   };
 
+  /**
+   * Inner class used to namespace the handling of /weights requests.
+   *
+   * It operates inside the Master actor. It is responsible for validating
+   * and persisting /weights requests.
+   * @see master/weights_handler.cpp for implementations.
+   */
+  class WeightsHandler
+  {
+  public:
+    explicit WeightsHandler(Master* _master) : master(_master)
+    {
+      CHECK_NOTNULL(master);
+    }
+
+    process::Future<process::http::Response> update(
+        const process::http::Request& request,
+        const Option<std::string>& principal) const;
+
+  private:
+    process::Future<bool> authorize(
+        const Option<std::string>& principal,
+        const std::vector<std::string>& roles) const;
+
+    process::Future<process::http::Response> _update(
+        const std::vector<WeightInfo>& updateWeightInfos) const;
+
+    Master* master;
+  };
+
   // Inner class used to namespace HTTP route handlers (see
   // master/http.cpp for implementations).
   class Http
   {
   public:
-    explicit Http(Master* _master) : master(_master), quotaHandler(_master) {}
+    explicit Http(Master* _master) : master(_master),
+                                     quotaHandler(_master),
+                                     weightsHandler(_master) {}
 
     // Logs the request, route handlers can compose this with the
     // desired request handler to get consistent request logging.
@@ -1118,6 +1150,11 @@ private:
         const process::http::Request& request,
         const Option<std::string>& principal) const;
 
+    // /master/weights
+    process::Future<process::http::Response> weights(
+        const process::http::Request& request,
+        const Option<std::string>& principal) const;
+
     static std::string SCHEDULER_HELP();
     static std::string FLAGS_HELP();
     static std::string FRAMEWORKS();
@@ -1139,6 +1176,7 @@ private:
     static std::string RESERVE_HELP();
     static std::string UNRESERVE_HELP();
     static std::string QUOTA_HELP();
+    static std::string WEIGHTS_HELP();
 
   private:
     // Continuations.
@@ -1174,6 +1212,10 @@ private:
     // NOTE: The quota specific pieces of the Operator API are factored
     // out into this separate class.
     QuotaHandler quotaHandler;
+
+    // NOTE: The weights specific pieces of the Operator API are factored
+    // out into this separate class.
+    WeightsHandler weightsHandler;
   };
 
   Master(const Master&);              // No copying.
@@ -1487,6 +1529,56 @@ private:
 };
 
 
+// Implementation of weights update Registrar operation.
+class UpdateWeights : public Operation
+{
+public:
+  explicit UpdateWeights(const std::vector<WeightInfo>& _weightInfos)
+    : weightInfos(_weightInfos) {}
+
+protected:
+  virtual Try<bool> perform(Registry* registry, hashset<SlaveID>*, bool)
+  {
+    bool mutated = false;
+    if (weightInfos.empty()) {
+      return false; // No mutation.
+    }
+
+    foreach (const WeightInfo& weightInfo, weightInfos) {
+      bool hasStored = false;
+      for (int i = 0; i < registry->weights().size(); ++i) {
+        Registry::Weight* weight = registry->mutable_weights(i);
+
+        if (weight->info().role() != weightInfo.role()) {
+          continue;
+        }
+
+        hasStored = true;
+
+        // If there is already weight stored for the specified role
+        // and also its value is changed, update the entry.
+        if (weight->info().weight() != weightInfo.weight()) {
+          weight->mutable_info()->CopyFrom(weightInfo);
+          mutated = true;
+        }
+        break;
+      }
+
+      // If there is no weight yet for this role in registry,
+      // create a new entry.
+      if (!hasStored) {
+        registry->add_weights()->mutable_info()->CopyFrom(weightInfo);
+        mutated = true;
+      }
+    }
+    return mutated;
+  }
+
+private:
+  const std::vector<WeightInfo> weightInfos;
+};
+
+
 // Implementation of slave admission Registrar operation.
 class AdmitSlave : public Operation
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/registry.proto
----------------------------------------------------------------------
diff --git a/src/master/registry.proto b/src/master/registry.proto
index 9958f9c..9bf9998 100644
--- a/src/master/registry.proto
+++ b/src/master/registry.proto
@@ -53,6 +53,10 @@ message Registry {
     required quota.QuotaInfo info = 1;
   }
 
+  message Weight {
+    required WeightInfo info = 1;
+  }
+
   // Most recent leading master.
   optional Master master = 1;
 
@@ -73,4 +77,8 @@ message Registry {
   // assignment of resources, a newly elected master shall reconstruct it
   // from the cluster.
   repeated Quota quotas = 5;
+
+  // A list of recorded weights in the cluster, a newly elected master shall
+  // reconstruct it from the registry.
+  repeated Weight weights = 6;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/weights_handler.cpp
----------------------------------------------------------------------
diff --git a/src/master/weights_handler.cpp b/src/master/weights_handler.cpp
new file mode 100644
index 0000000..be4b5ab
--- /dev/null
+++ b/src/master/weights_handler.cpp
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include "master/master.hpp"
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+
+namespace http = process::http;
+
+using google::protobuf::RepeatedPtrField;
+
+using std::string;
+using std::vector;
+
+using http::Accepted;
+using http::BadRequest;
+using http::Conflict;
+using http::Forbidden;
+using http::OK;
+
+using process::Future;
+using process::Owned;
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+Future<http::Response> Master::WeightsHandler::update(
+    const http::Request& request,
+    const Option<std::string>& principal) const
+{
+  VLOG(1) << "Updating weights from request: '" << request.body << "'";
+
+  // Check that the request type is PUT which is guaranteed by the master.
+  CHECK_EQ("PUT", request.method);
+
+  Try<JSON::Array> parse = JSON::parse<JSON::Array>(request.body);
+  if (parse.isError()) {
+    return BadRequest(
+        "Failed to parse update request JSON ('" + request.body + "': " +
+        parse.error());
+  }
+
+  // Create Protobuf representation of weights.
+  Try<RepeatedPtrField<WeightInfo>> weightInfos =
+    ::protobuf::parse<RepeatedPtrField<WeightInfo>>(parse.get());
+
+  if (weightInfos.isError()) {
+    return BadRequest(
+        "Failed to convert weights JSON array to protobuf ('" +
+        request.body + "'): " + weightInfos.error());
+  }
+
+  vector<WeightInfo> validatedWeightInfos;
+  vector<string> roles;
+  for (WeightInfo& weightInfo : weightInfos.get()) {
+    string role = strings::trim(weightInfo.role());
+
+    if (role.empty()) {
+      return BadRequest(
+          "Role cannot be empty for weight '" +
+          stringify(weightInfo.weight()) + "'");
+    }
+
+    if (weightInfo.weight() <= 0) {
+      return BadRequest(
+          "Invalid weight '" + stringify(weightInfo.weight()) +
+          "' for role '" + role + "'. Weights must be positive.");
+    }
+
+    if (!master->isWhitelistedRole(role)) {
+      return BadRequest(
+          "Invalid role: '" + role + "', which must exist in the static " +
+          "list of roles, specified when the master is started" +
+          " (via the --roles flag).");
+    }
+    weightInfo.set_role(role);
+    validatedWeightInfos.push_back(weightInfo);
+    roles.push_back(role);
+  }
+
+  return authorize(principal, roles)
+    .then(defer(master->self(), [=](bool authorized) -> Future<http::Response> {
+      if (!authorized) {
+        return Forbidden();
+      }
+
+      return _update(validatedWeightInfos);
+    }));
+}
+
+
+Future<http::Response> Master::WeightsHandler::_update(
+    const vector<WeightInfo>& weightInfos) const
+{
+  // Update the registry and acknowledge the request.
+  return master->registrar->apply(Owned<Operation>(
+      new UpdateWeights(weightInfos)))
+    .then(defer(master->self(), [=](bool result) -> Future<http::Response> {
+      CHECK(result);
+
+      // Update weights.
+      foreach (const WeightInfo& weightInfo, weightInfos) {
+        master->weights[weightInfo.role()] = weightInfo.weight();
+      }
+
+      // Notify allocator for updating weights.
+      master->allocator->updateWeights(weightInfos);
+      return OK();
+    }));
+}
+
+
+Future<bool> Master::WeightsHandler::authorize(
+    const Option<string>& principal,
+    const vector<string>& roles) const
+{
+  if (master->authorizer.isNone()) {
+    return true;
+  }
+
+  LOG(INFO) << "Authorizing principal '"
+            << (principal.isSome() ? principal.get() : "ANY")
+            << "' to update weights for roles '" << stringify(roles) << "'";
+
+  mesos::ACL::UpdateWeights request;
+
+  if (principal.isSome()) {
+    request.mutable_principals()->add_values(principal.get());
+  } else {
+    request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  foreach (const string& role, roles) {
+    request.mutable_roles()->add_values(role);
+  }
+
+  return master->authorizer.get()->authorize(request);
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 395b23d..5770721 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -762,6 +762,9 @@ MockAuthorizer::MockAuthorizer()
   EXPECT_CALL(*this, authorize(An<const mesos::ACL::RemoveQuota&>()))
     .WillRepeatedly(Return(true));
 
+  EXPECT_CALL(*this, authorize(An<const mesos::ACL::UpdateWeights&>()))
+    .WillRepeatedly(Return(true));
+
   EXPECT_CALL(*this, initialize(An<const Option<ACLs>&>()))
     .WillRepeatedly(Return(Nothing()));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 9c62833..9409da7 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1654,6 +1654,8 @@ public:
       authorize, process::Future<bool>(const ACL::SetQuota& request));
   MOCK_METHOD1(
       authorize, process::Future<bool>(const ACL::RemoveQuota& request));
+  MOCK_METHOD1(
+      authorize, process::Future<bool>(const ACL::UpdateWeights& request));
 };