You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by me...@apache.org on 2016/03/09 08:26:12 UTC
[4/4] mesos git commit: Introduced HTTP endpoint /weights for
updating weight.
Introduced HTTP endpoint /weights for updating weight.
Review: https://reviews.apache.org/r/41681/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0fc4fd6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0fc4fd6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0fc4fd6
Branch: refs/heads/master
Commit: d0fc4fd6510f70d540caccbac3014984f08696ef
Parents: 7372385
Author: Yongqiao Wang <yq...@cn.ibm.com>
Authored: Tue Mar 8 14:57:22 2016 -0800
Committer: Adam B <ad...@mesosphere.io>
Committed: Tue Mar 8 23:18:41 2016 -0800
----------------------------------------------------------------------
include/mesos/authorizer/authorizer.hpp | 14 +++
include/mesos/authorizer/authorizer.proto | 10 ++
src/CMakeLists.txt | 1 +
src/Makefile.am | 1 +
src/authorizer/local/authorizer.cpp | 29 +++++
src/authorizer/local/authorizer.hpp | 2 +
src/master/http.cpp | 30 +++++
src/master/master.cpp | 64 ++++++++++
src/master/master.hpp | 94 ++++++++++++++-
src/master/registry.proto | 8 ++
src/master/weights_handler.cpp | 160 +++++++++++++++++++++++++
src/tests/mesos.cpp | 3 +
src/tests/mesos.hpp | 2 +
13 files changed, 417 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/include/mesos/authorizer/authorizer.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/authorizer.hpp b/include/mesos/authorizer/authorizer.hpp
index ec6c992..7054826 100644
--- a/include/mesos/authorizer/authorizer.hpp
+++ b/include/mesos/authorizer/authorizer.hpp
@@ -201,6 +201,20 @@ public:
virtual process::Future<bool> authorize(
const ACL::RemoveQuota& request) = 0;
+ /**
+ * Verifies whether a principal can update the weight for the specific roles.
+ *
+ * @param request `ACL::UpdateWeights` packing all the parameters needed to
+ * verify if the given principal is allowed to update the weight of the
+ * specified roles.
+ *
+ * @return true if the principal is allowed to update the weight for every
+ * specified role, false otherwise. A failed future indicates a problem
+ * processing the request; the request can be retried.
+ */
+ virtual process::Future<bool> authorize(
+ const ACL::UpdateWeights& request) = 0;
+
protected:
Authorizer() {}
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/include/mesos/authorizer/authorizer.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/authorizer.proto b/include/mesos/authorizer/authorizer.proto
index 723da93..d65377c 100644
--- a/include/mesos/authorizer/authorizer.proto
+++ b/include/mesos/authorizer/authorizer.proto
@@ -135,6 +135,15 @@ message ACL {
// Objects: Principal of the entity that set the quota.
required Entity quota_principals = 2;
}
+
+ // Which principals are authorized to update weights for the given roles.
+ message UpdateWeights {
+ // Subjects: Operator username.
+ required Entity principals = 1;
+
+ // Objects: The list of roles whose weights can be updated.
+ optional Entity roles = 2;
+ }
}
@@ -171,4 +180,5 @@ message ACLs {
repeated ACL.SetQuota set_quotas = 9;
repeated ACL.RemoveQuota remove_quotas = 10;
repeated ACL.TeardownFramework teardown_frameworks = 11;
+ repeated ACL.UpdateWeights update_weights = 12;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8f57a57..b78d0f4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -208,6 +208,7 @@ set(MASTER_SRC
master/registrar.cpp
master/repairer.cpp
master/validation.cpp
+ master/weights_handler.cpp
master/allocator/allocator.cpp
master/allocator/mesos/hierarchical.cpp
master/allocator/mesos/metrics.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a41e95d..b449343 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -627,6 +627,7 @@ libmesos_no_3rdparty_la_SOURCES += \
master/registrar.cpp \
master/repairer.cpp \
master/validation.cpp \
+ master/weights_handler.cpp \
master/allocator/allocator.cpp \
master/allocator/mesos/hierarchical.cpp \
master/allocator/mesos/metrics.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/authorizer/local/authorizer.cpp
----------------------------------------------------------------------
diff --git a/src/authorizer/local/authorizer.cpp b/src/authorizer/local/authorizer.cpp
index 4e5c2c2..15c857d 100644
--- a/src/authorizer/local/authorizer.cpp
+++ b/src/authorizer/local/authorizer.cpp
@@ -206,6 +206,21 @@ public:
return acls.permissive(); // None of the ACLs match.
}
+ Future<bool> authorize(const ACL::UpdateWeights& request)
+ {
+ foreach (const ACL::UpdateWeights& acl, acls.update_weights()) {
+ // ACL matches if both subjects and objects match.
+ if (matches(request.principals(), acl.principals()) &&
+ matches(request.roles(), acl.roles())) {
+ // ACL is allowed if both subjects and objects are allowed.
+ return allows(request.principals(), acl.principals()) &&
+ allows(request.roles(), acl.roles());
+ }
+ }
+
+ return acls.permissive(); // None of the ACLs match.
+ }
+
private:
// Match matrix:
//
@@ -494,5 +509,19 @@ Future<bool> LocalAuthorizer::authorize(const ACL::RemoveQuota& request)
process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
}
+
+Future<bool> LocalAuthorizer::authorize(const ACL::UpdateWeights& request)
+{
+ if (process == NULL) {
+ return Failure("Authorizer not initialized");
+ }
+
+ // Necessary to disambiguate.
+ typedef Future<bool>(LocalAuthorizerProcess::*F)(const ACL::UpdateWeights&);
+
+ return dispatch(
+ process, static_cast<F>(&LocalAuthorizerProcess::authorize), request);
+}
+
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/authorizer/local/authorizer.hpp
----------------------------------------------------------------------
diff --git a/src/authorizer/local/authorizer.hpp b/src/authorizer/local/authorizer.hpp
index 96baf77..c87a991 100644
--- a/src/authorizer/local/authorizer.hpp
+++ b/src/authorizer/local/authorizer.hpp
@@ -69,6 +69,8 @@ public:
const ACL::SetQuota& request);
virtual process::Future<bool> authorize(
const ACL::RemoveQuota& request);
+ virtual process::Future<bool> authorize(
+ const ACL::UpdateWeights& request);
private:
LocalAuthorizer();
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index a3ad57a..7304bfd 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1157,6 +1157,36 @@ Future<Response> Master::Http::quota(
}
+string Master::Http::WEIGHTS_HELP()
+{
+ return HELP(
+ TLDR(
+ "Updates weights for the specified roles."),
+ DESCRIPTION(
+ "PUT: Validates the request body as JSON",
+ " and updates the weights for the specified roles."));
+}
+
+
+Future<Response> Master::Http::weights(
+ const Request& request,
+ const Option<string>& principal) const
+{
+ // Dispatch based on HTTP method to separate `WeightsHandler`.
+ if (request.method == "PUT") {
+ return weightsHandler.update(request, principal);
+ }
+
+ // TODO(Yongqiao Wang): Like /quota, we should also add query logic
+ // for /weights to keep consistent. Then /roles no longer needs to
+ // show weight information.
+
+ return MethodNotAllowed(
+ {"PUT"},
+ "Expecting 'PUT', received '" + request.method + "'");
+}
+
+
string Master::Http::STATE_HELP()
{
return HELP(
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 57ff4a3..a952c0a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -981,6 +981,14 @@ void Master::initialize()
Http::log(request);
return http.quota(request, principal);
});
+ route("/weights",
+ DEFAULT_HTTP_AUTHENTICATION_REALM,
+ Http::WEIGHTS_HELP(),
+ [this](const process::http::Request& request,
+ const Option<string>& principal) {
+ Http::log(request);
+ return http.weights(request, principal);
+ });
// Provide HTTP assets from a "webui" directory. This is either
// specified via flags (which is necessary for running out of the
@@ -1517,6 +1525,62 @@ Future<Nothing> Master::_recover(const Registry& registry)
// satisfiable given all recovering agents reregister. We may want
// to notify operators early if total quota cannot be met.
+ if (registry.weights_size() != 0) {
+ vector<WeightInfo> weightInfos;
+ hashmap<std::string, double> registry_weights;
+
+ // Save the weights.
+ foreach (const Registry::Weight& weight, registry.weights()) {
+ registry_weights[weight.info().role()] = weight.info().weight();
+ WeightInfo weightInfo;
+ weightInfo.set_role(weight.info().role());
+ weightInfo.set_weight(weight.info().weight());
+ weightInfos.push_back(weightInfo);
+ }
+
+ // TODO(Yongqiao Wang): After the Mesos master quorum is achieved,
+ // operator can send an update weights request to do a batch configuration
+ // for weights, so the `--weights` flag can be deprecated and this check
+ // can eventually be removed.
+ if (!weights.empty()) {
+ LOG(WARNING) << "Ignoring the --weights flag '" << flags.weights.get()
+ << "', and recovering the weights from registry.";
+
+ // Before recovering weights from the registry, the allocator was already
+ // initialized with `--weights`, so here we need to reset (to 1.0)
+ // weights in the allocator that are not overridden by the registry.
+ foreachkey (const std::string& role, weights) {
+ if (!registry_weights.contains(role)) {
+ WeightInfo weightInfo;
+ weightInfo.set_role(role);
+ weightInfo.set_weight(1.0);
+ weightInfos.push_back(weightInfo);
+ }
+ }
+ // Clear weights specified by `--weights` flag.
+ weights.clear();
+ }
+
+ // Recover `weights` with `registry_weights`.
+ weights = registry_weights;
+
+ // Update allocator.
+ allocator->updateWeights(weightInfos);
+ } else if (!weights.empty()) {
+ // The allocator was already updated with the `--weights` flag values
+ // on startup.
+ // Initialize the registry with `--weights` flag when bootstrapping
+ // the cluster.
+ vector<WeightInfo> weightInfos;
+ foreachpair (const std::string& role, double weight, weights) {
+ WeightInfo weightInfo;
+ weightInfo.set_role(role);
+ weightInfo.set_weight(weight);
+ weightInfos.push_back(weightInfo);
+ }
+ registrar->apply(Owned<Operation>(new UpdateWeights(weightInfos)));
+ }
+
// Recovery is now complete!
LOG(INFO) << "Recovered " << registry.slaves().slaves().size() << " slaves"
<< " from the Registry (" << Bytes(registry.ByteSize()) << ")"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index ea26670..a1b4f03 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1017,12 +1017,44 @@ private:
Master* master;
};
+ /**
+ * Inner class used to namespace the handling of /weights requests.
+ *
+ * It operates inside the Master actor. It is responsible for validating
+ * and persisting /weights requests.
+ * @see master/weights_handler.cpp for implementations.
+ */
+ class WeightsHandler
+ {
+ public:
+ explicit WeightsHandler(Master* _master) : master(_master)
+ {
+ CHECK_NOTNULL(master);
+ }
+
+ process::Future<process::http::Response> update(
+ const process::http::Request& request,
+ const Option<std::string>& principal) const;
+
+ private:
+ process::Future<bool> authorize(
+ const Option<std::string>& principal,
+ const std::vector<std::string>& roles) const;
+
+ process::Future<process::http::Response> _update(
+ const std::vector<WeightInfo>& updateWeightInfos) const;
+
+ Master* master;
+ };
+
// Inner class used to namespace HTTP route handlers (see
// master/http.cpp for implementations).
class Http
{
public:
- explicit Http(Master* _master) : master(_master), quotaHandler(_master) {}
+ explicit Http(Master* _master) : master(_master),
+ quotaHandler(_master),
+ weightsHandler(_master) {}
// Logs the request, route handlers can compose this with the
// desired request handler to get consistent request logging.
@@ -1118,6 +1150,11 @@ private:
const process::http::Request& request,
const Option<std::string>& principal) const;
+ // /master/weights
+ process::Future<process::http::Response> weights(
+ const process::http::Request& request,
+ const Option<std::string>& principal) const;
+
static std::string SCHEDULER_HELP();
static std::string FLAGS_HELP();
static std::string FRAMEWORKS();
@@ -1139,6 +1176,7 @@ private:
static std::string RESERVE_HELP();
static std::string UNRESERVE_HELP();
static std::string QUOTA_HELP();
+ static std::string WEIGHTS_HELP();
private:
// Continuations.
@@ -1174,6 +1212,10 @@ private:
// NOTE: The quota specific pieces of the Operator API are factored
// out into this separate class.
QuotaHandler quotaHandler;
+
+ // NOTE: The weights specific pieces of the Operator API are factored
+ // out into this separate class.
+ WeightsHandler weightsHandler;
};
Master(const Master&); // No copying.
@@ -1487,6 +1529,56 @@ private:
};
+// Implementation of weights update Registrar operation.
+class UpdateWeights : public Operation
+{
+public:
+ explicit UpdateWeights(const std::vector<WeightInfo>& _weightInfos)
+ : weightInfos(_weightInfos) {}
+
+protected:
+ virtual Try<bool> perform(Registry* registry, hashset<SlaveID>*, bool)
+ {
+ bool mutated = false;
+ if (weightInfos.empty()) {
+ return false; // No mutation.
+ }
+
+ foreach (const WeightInfo& weightInfo, weightInfos) {
+ bool hasStored = false;
+ for (int i = 0; i < registry->weights().size(); ++i) {
+ Registry::Weight* weight = registry->mutable_weights(i);
+
+ if (weight->info().role() != weightInfo.role()) {
+ continue;
+ }
+
+ hasStored = true;
+
+ // If there is already weight stored for the specified role
+ // and also its value is changed, update the entry.
+ if (weight->info().weight() != weightInfo.weight()) {
+ weight->mutable_info()->CopyFrom(weightInfo);
+ mutated = true;
+ }
+ break;
+ }
+
+ // If there is no weight yet for this role in registry,
+ // create a new entry.
+ if (!hasStored) {
+ registry->add_weights()->mutable_info()->CopyFrom(weightInfo);
+ mutated = true;
+ }
+ }
+ return mutated;
+ }
+
+private:
+ const std::vector<WeightInfo> weightInfos;
+};
+
+
// Implementation of slave admission Registrar operation.
class AdmitSlave : public Operation
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/registry.proto
----------------------------------------------------------------------
diff --git a/src/master/registry.proto b/src/master/registry.proto
index 9958f9c..9bf9998 100644
--- a/src/master/registry.proto
+++ b/src/master/registry.proto
@@ -53,6 +53,10 @@ message Registry {
required quota.QuotaInfo info = 1;
}
+ message Weight {
+ required WeightInfo info = 1;
+ }
+
// Most recent leading master.
optional Master master = 1;
@@ -73,4 +77,8 @@ message Registry {
// assignment of resources, a newly elected master shall reconstruct it
// from the cluster.
repeated Quota quotas = 5;
+
+ // A list of recorded weights in the cluster, a newly elected master shall
+ // reconstruct it from the registry.
+ repeated Weight weights = 6;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/master/weights_handler.cpp
----------------------------------------------------------------------
diff --git a/src/master/weights_handler.cpp b/src/master/weights_handler.cpp
new file mode 100644
index 0000000..be4b5ab
--- /dev/null
+++ b/src/master/weights_handler.cpp
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include "master/master.hpp"
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+
+namespace http = process::http;
+
+using google::protobuf::RepeatedPtrField;
+
+using std::string;
+using std::vector;
+
+using http::Accepted;
+using http::BadRequest;
+using http::Conflict;
+using http::Forbidden;
+using http::OK;
+
+using process::Future;
+using process::Owned;
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+Future<http::Response> Master::WeightsHandler::update(
+ const http::Request& request,
+ const Option<std::string>& principal) const
+{
+ VLOG(1) << "Updating weights from request: '" << request.body << "'";
+
+ // Check that the request type is PUT which is guaranteed by the master.
+ CHECK_EQ("PUT", request.method);
+
+ Try<JSON::Array> parse = JSON::parse<JSON::Array>(request.body);
+ if (parse.isError()) {
+ return BadRequest(
+ "Failed to parse update request JSON ('" + request.body + "': " +
+ parse.error());
+ }
+
+ // Create Protobuf representation of weights.
+ Try<RepeatedPtrField<WeightInfo>> weightInfos =
+ ::protobuf::parse<RepeatedPtrField<WeightInfo>>(parse.get());
+
+ if (weightInfos.isError()) {
+ return BadRequest(
+ "Failed to convert weights JSON array to protobuf ('" +
+ request.body + "'): " + weightInfos.error());
+ }
+
+ vector<WeightInfo> validatedWeightInfos;
+ vector<string> roles;
+ for (WeightInfo& weightInfo : weightInfos.get()) {
+ string role = strings::trim(weightInfo.role());
+
+ if (role.empty()) {
+ return BadRequest(
+ "Role cannot be empty for weight '" +
+ stringify(weightInfo.weight()) + "'");
+ }
+
+ if (weightInfo.weight() <= 0) {
+ return BadRequest(
+ "Invalid weight '" + stringify(weightInfo.weight()) +
+ "' for role '" + role + "'. Weights must be positive.");
+ }
+
+ if (!master->isWhitelistedRole(role)) {
+ return BadRequest(
+ "Invalid role: '" + role + "', which must exist in the static " +
+ "list of roles, specified when the master is started" +
+ " (via the --roles flag).");
+ }
+ weightInfo.set_role(role);
+ validatedWeightInfos.push_back(weightInfo);
+ roles.push_back(role);
+ }
+
+ return authorize(principal, roles)
+ .then(defer(master->self(), [=](bool authorized) -> Future<http::Response> {
+ if (!authorized) {
+ return Forbidden();
+ }
+
+ return _update(validatedWeightInfos);
+ }));
+}
+
+
+Future<http::Response> Master::WeightsHandler::_update(
+ const vector<WeightInfo>& weightInfos) const
+{
+ // Update the registry and acknowledge the request.
+ return master->registrar->apply(Owned<Operation>(
+ new UpdateWeights(weightInfos)))
+ .then(defer(master->self(), [=](bool result) -> Future<http::Response> {
+ CHECK(result);
+
+ // Update weights.
+ foreach (const WeightInfo& weightInfo, weightInfos) {
+ master->weights[weightInfo.role()] = weightInfo.weight();
+ }
+
+ // Notify allocator for updating weights.
+ master->allocator->updateWeights(weightInfos);
+ return OK();
+ }));
+}
+
+
+Future<bool> Master::WeightsHandler::authorize(
+ const Option<string>& principal,
+ const vector<string>& roles) const
+{
+ if (master->authorizer.isNone()) {
+ return true;
+ }
+
+ LOG(INFO) << "Authorizing principal '"
+ << (principal.isSome() ? principal.get() : "ANY")
+ << "' to update weights for roles '" << stringify(roles) << "'";
+
+ mesos::ACL::UpdateWeights request;
+
+ if (principal.isSome()) {
+ request.mutable_principals()->add_values(principal.get());
+ } else {
+ request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+ }
+
+ foreach (const string& role, roles) {
+ request.mutable_roles()->add_values(role);
+ }
+
+ return master->authorizer.get()->authorize(request);
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 395b23d..5770721 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -762,6 +762,9 @@ MockAuthorizer::MockAuthorizer()
EXPECT_CALL(*this, authorize(An<const mesos::ACL::RemoveQuota&>()))
.WillRepeatedly(Return(true));
+ EXPECT_CALL(*this, authorize(An<const mesos::ACL::UpdateWeights&>()))
+ .WillRepeatedly(Return(true));
+
EXPECT_CALL(*this, initialize(An<const Option<ACLs>&>()))
.WillRepeatedly(Return(Nothing()));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0fc4fd6/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 9c62833..9409da7 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1654,6 +1654,8 @@ public:
authorize, process::Future<bool>(const ACL::SetQuota& request));
MOCK_METHOD1(
authorize, process::Future<bool>(const ACL::RemoveQuota& request));
+ MOCK_METHOD1(
+ authorize, process::Future<bool>(const ACL::UpdateWeights& request));
};