You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/03 19:58:49 UTC
mesos git commit: Refactored task/offer/resource valiation in master.
Repository: mesos
Updated Branches:
refs/heads/master 01d391b4a -> b22d7addb
Refactored task/offer/resource valiation in master.
Review: https://reviews.apache.org/r/30459
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b22d7add
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b22d7add
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b22d7add
Branch: refs/heads/master
Commit: b22d7addbc03dfe4a5aa63a05e4f805b1c15631d
Parents: 01d391b
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jan 30 11:12:13 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 3 10:57:45 2015 -0800
----------------------------------------------------------------------
src/Makefile.am | 2 +
src/common/type_utils.hpp | 28 +-
src/master/master.cpp | 607 +----------------------------------------
src/master/master.hpp | 23 +-
src/master/validation.cpp | 526 +++++++++++++++++++++++++++++++++++
src/master/validation.hpp | 91 ++++++
6 files changed, 658 insertions(+), 619 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 07bea1f..5b1885d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -295,6 +295,7 @@ libmesos_no_3rdparty_la_SOURCES = \
master/registry.proto \
master/registrar.cpp \
master/repairer.cpp \
+ master/validation.cpp \
module/manager.cpp \
sched/constants.cpp \
sched/sched.cpp \
@@ -463,6 +464,7 @@ libmesos_no_3rdparty_la_SOURCES += \
master/repairer.hpp \
master/registrar.hpp \
master/sorter.hpp \
+ master/validation.hpp \
messages/messages.hpp \
module/authenticatee.hpp \
module/authenticator.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/common/type_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.hpp b/src/common/type_utils.hpp
index 6809a12..b23eeb8 100644
--- a/src/common/type_utils.hpp
+++ b/src/common/type_utils.hpp
@@ -55,14 +55,6 @@ inline bool operator == (const ExecutorID& left, const ExecutorID& right)
}
-inline std::ostream& operator << (
- std::ostream& stream,
- const ContainerInfo& containerInfo)
-{
- return stream << containerInfo.DebugString();
-}
-
-
inline bool operator == (const FrameworkID& left, const FrameworkID& right)
{
return left.value() == right.value();
@@ -135,6 +127,18 @@ inline bool operator != (const ContainerID& left, const ContainerID& right)
}
+inline bool operator != (const FrameworkID& left, const FrameworkID& right)
+{
+ return left.value() != right.value();
+}
+
+
+inline bool operator != (const SlaveID& left, const SlaveID& right)
+{
+ return left.value() != right.value();
+}
+
+
inline bool operator < (const ContainerID& left, const ContainerID& right)
{
return left.value() < right.value();
@@ -237,6 +241,14 @@ inline std::ostream& operator << (
inline std::ostream& operator << (
std::ostream& stream,
+ const ContainerInfo& containerInfo)
+{
+ return stream << containerInfo.DebugString();
+}
+
+
+inline std::ostream& operator << (
+ std::ostream& stream,
const ExecutorID& executorId)
{
return stream << executorId.value();
http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e737fcb..d04b2c4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1896,530 +1896,6 @@ void Master::resourceRequest(
}
-struct ResourcesValidator
-{
- virtual Option<Error> operator () (
- const Resources& resources,
- const Slave& slave) = 0;
-
- virtual ~ResourcesValidator() {}
-};
-
-
-struct DiskInfoValidator : ResourcesValidator
-{
- virtual Option<Error> operator () (
- const Resources& resources,
- const Slave& slave)
- {
- foreach (const Resource& resource, resources) {
- if (!resource.has_disk()) {
- continue;
- }
-
- if (resource.disk().has_persistence()) {
- if (resource.role() == "*") {
- return Error(
- "Invalid DiskInfo: '*' role not "
- "supported for persistent volume");
- }
- if (!resource.disk().has_volume()) {
- return Error(
- "Invalid DiskInfo: expecting 'volume' "
- "to be set for persistent volume");
- }
- if (resource.disk().volume().mode() == Volume::RO) {
- return Error(
- "Invalid DiskInfo: read-only persistent "
- "volume not supported");
- }
- if (resource.disk().volume().has_host_path()) {
- return Error(
- "Invalid DiskInfo: expecting 'host_path' "
- "to be unset for persistent volume");
- }
-
- // Ensure persistence ID does not have invalid characters.
- string id = resource.disk().persistence().id();
- if (std::count_if(id.begin(), id.end(), invalid) > 0) {
- return Error(
- "Invalid DiskInfo: persistence ID '" + id +
- "' contains invalid characters");
- }
- } else if (resource.disk().has_volume()) {
- return Error("Invalid DiskInfo: non-persistent volume not supported");
- } else {
- return Error("Invalid DiskInfo: empty");
- }
- }
-
- return None();
- }
-
- static bool invalid(char c)
- {
- return iscntrl(c) || c == '/' || c == '\\';
- }
-};
-
-
-struct UniquePersistenceIDValidator : ResourcesValidator
-{
- virtual Option<Error> operator () (
- const Resources& resources,
- const Slave& slave)
- {
- hashmap<string, hashset<string>> persistenceIds;
-
- // Check duplicated persistence ID within the given resources.
- foreach (const Resource& resource, resources) {
- if (!resource.has_disk() || !resource.disk().has_persistence()) {
- continue;
- }
-
- const string& role = resource.role();
- const string& id = resource.disk().persistence().id();
-
- if (persistenceIds.contains(role) &&
- persistenceIds[role].contains(id)) {
- return Error("Persistence ID '" + id + "' is not unique");
- }
-
- persistenceIds[role].insert(id);
- }
-
- return None();
- }
-};
-
-
-// Abstraction for performing any validations, aggregations, etc. of
-// tasks that a framework attempts to run within the resources
-// provided by offers. A validator can return an optional error which
-// will cause the master to send a failed status update back to the
-// framework for only that task. An instance will be reused for each
-// task from same 'launchTasks()', but not for task from different
-// offers.
-struct TaskInfoValidator
-{
- virtual Option<Error> operator () (
- const TaskInfo& task,
- const Framework& framework,
- const Slave& slave,
- const Resources& offeredResources,
- const Resources& usedResources) = 0;
-
- virtual ~TaskInfoValidator() {}
-};
-
-
-// Validates that a task id is valid, i.e., contains only valid
-// characters.
-struct TaskIDValidator : TaskInfoValidator
-{
- virtual Option<Error> operator () (
- const TaskInfo& task,
- const Framework& framework,
- const Slave& slave,
- const Resources& offeredResources,
- const Resources& usedResources)
- {
- const string& id = task.task_id().value();
-
- if (std::count_if(id.begin(), id.end(), invalid) > 0) {
- return Error("TaskID '" + id + "' contains invalid characters");
- }
-
- return None();
- }
-
- static bool invalid(char c)
- {
- return iscntrl(c) || c == '/' || c == '\\';
- }
-};
-
-
-// Validates that the slave ID used by a task is correct.
-struct SlaveIDValidator : TaskInfoValidator
-{
- virtual Option<Error> operator () (
- const TaskInfo& task,
- const Framework& framework,
- const Slave& slave,
- const Resources& offeredResources,
- const Resources& usedResources)
- {
- if (!(task.slave_id() == slave.id)) {
- return Error(
- "Task uses invalid slave " + task.slave_id().value() +
- " while slave " + slave.id.value() + " is expected");
- }
-
- return None();
- }
-};
-
-
-// Validates that each task uses a unique ID. Regardless of whether a
-// task actually gets launched (for example, another validator may
-// return an error for a task), we always consider it an error when a
-// task tries to re-use an ID.
-struct UniqueTaskIDValidator : TaskInfoValidator
-{
- virtual Option<Error> operator () (
- const TaskInfo& task,
- const Framework& framework,
- const Slave& slave,
- const Resources& offeredResources,
- const Resources& usedResources)
- {
- const TaskID& taskId = task.task_id();
-
- if (framework.tasks.contains(taskId)) {
- return Error("Task has duplicate ID: " + taskId.value());
- }
-
- return None();
- }
-};
-
-
-// Validates that resources specified by the framework are valid.
-struct ResourceValidator : TaskInfoValidator
-{
- virtual Option<Error> operator () (
- const TaskInfo& task,
- const Framework& framework,
- const Slave& slave,
- const Resources& offeredResources,
- const Resources& usedResources)
- {
- Option<Error> error = Resources::validate(task.resources());
- if (error.isSome()) {
- return Error("Task uses invalid resources: " + error.get().message);
- }
-
- Resources resources = task.resources();
-
- if (task.has_executor()) {
- Option<Error> error = Resources::validate(task.executor().resources());
- if (error.isSome()) {
- return Error(
- "Executor uses invalid resources: " + error.get().message);
- }
-
- resources += task.executor().resources();
- }
-
- vector<Owned<ResourcesValidator>> validators = {
- Owned<ResourcesValidator>(new DiskInfoValidator()),
- Owned<ResourcesValidator>(new UniquePersistenceIDValidator())
- };
-
- foreach (const Owned<ResourcesValidator>& validator, validators) {
- error = (*validator)(resources, slave);
- if (error.isSome()) {
- return Error(error.get().message);
- }
- }
-
- return None();
- }
-};
-
-
-// Validates that the task and the executor are using proper amount of
-// resources. For instance, the used resources by a task on each slave
-// should not exceed the total resources offered on that slave.
-struct ResourceUsageValidator : TaskInfoValidator
-{
- virtual Option<Error> operator () (
- const TaskInfo& task,
- const Framework& framework,
- const Slave& slave,
- const Resources& offeredResources,
- const Resources& usedResources)
- {
- Resources taskResources = task.resources();
-
- if (taskResources.empty()) {
- return Error("Task uses no resources");
- }
-
- Resources executorResources;
- if (task.has_executor()) {
- executorResources = task.executor().resources();
- }
-
- // Validate minimal cpus and memory resources of executor and log
- // warnings if not set.
- if (task.has_executor()) {
- // TODO(martin): MESOS-1807. Return Error instead of logging a
- // warning in 0.22.0.
- Option<double> cpus = executorResources.cpus();
- if (cpus.isNone() || cpus.get() < MIN_CPUS) {
- LOG(WARNING)
- << "Executor " << stringify(task.executor().executor_id())
- << " for task " << stringify(task.task_id())
- << " uses less CPUs ("
- << (cpus.isSome() ? stringify(cpus.get()) : "None")
- << ") than the minimum required (" << MIN_CPUS
- << "). Please update your executor, as this will be mandatory "
- << "in future releases.";
- }
-
- Option<Bytes> mem = executorResources.mem();
- if (mem.isNone() || mem.get() < MIN_MEM) {
- LOG(WARNING)
- << "Executor " << stringify(task.executor().executor_id())
- << " for task " << stringify(task.task_id())
- << " uses less memory ("
- << (mem.isSome() ? stringify(mem.get().megabytes()) : "None")
- << ") than the minimum required (" << MIN_MEM
- << "). Please update your executor, as this will be mandatory "
- << "in future releases.";
- }
- }
-
- // Validate if resources needed by the task (and its executor in
- // case the executor is new) are available.
- Resources resources = taskResources;
- if (!slave.hasExecutor(framework.id, task.executor().executor_id())) {
- resources += executorResources;
- }
-
- if (!offeredResources.contains(resources + usedResources)) {
- return Error(
- "Task uses more resources " + stringify(resources) +
- " than available " + stringify(offeredResources - usedResources));
- }
-
- return None();
- }
-};
-
-
-// Validates that tasks that use the "same" executor (i.e., same
-// ExecutorID) have an identical ExecutorInfo.
-struct ExecutorInfoValidator : TaskInfoValidator
-{
- virtual Option<Error> operator () (
- const TaskInfo& task,
- const Framework& framework,
- const Slave& slave,
- const Resources& offeredResources,
- const Resources& usedResources)
- {
- if (task.has_executor() == task.has_command()) {
- return Error(
- "Task should have at least one (but not both) of CommandInfo or "
- "ExecutorInfo present");
- }
-
- if (task.has_executor()) {
- // The master currently expects ExecutorInfo.framework_id to be
- // set even though it is an optional field. Currently, the
- // scheduler driver ensures that the field is set. For
- // schedulers not using the driver, we need to do the validation
- // here.
- // TODO(bmahler): Set this field in the master instead of
- // depending on the scheduler driver do it.
- if (!task.executor().has_framework_id()) {
- return Error(
- "Task has invalid ExecutorInfo: missing field 'framework_id'");
- }
-
- if (!(task.executor().framework_id() == framework.id)) {
- return Error(
- "ExecutorInfo has an invalid FrameworkID"
- " (Actual: " + stringify(task.executor().framework_id()) +
- " vs Expected: " + stringify(framework.id) + ")");
- }
-
- const ExecutorID& executorId = task.executor().executor_id();
- Option<ExecutorInfo> executorInfo = None();
-
- if (slave.hasExecutor(framework.id, executorId)) {
- executorInfo = slave.executors.get(framework.id).get().get(executorId);
- }
-
- if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
- return Error(
- "Task has invalid ExecutorInfo (existing ExecutorInfo"
- " with same ExecutorID is not compatible).\n"
- "------------------------------------------------------------\n"
- "Existing ExecutorInfo:\n" +
- stringify(executorInfo.get()) + "\n"
- "------------------------------------------------------------\n"
- "Task's ExecutorInfo:\n" +
- stringify(task.executor()) + "\n"
- "------------------------------------------------------------\n");
- }
- }
-
- return None();
- }
-};
-
-
-// Validates that a task that asks for checkpointing is not being
-// launched on a slave that has not enabled checkpointing.
-struct CheckpointValidator : TaskInfoValidator
-{
- virtual Option<Error> operator () (
- const TaskInfo& task,
- const Framework& framework,
- const Slave& slave,
- const Resources& offeredResources,
- const Resources& usedResources)
- {
- if (framework.info.checkpoint() && !slave.info.checkpoint()) {
- return Error(
- "Task asked to be checkpointed but slave " +
- stringify(slave.id) + " has checkpointing disabled");
- }
-
- return None();
- }
-};
-
-
-// OfferValidators are similar to the TaskInfoValidator pattern and
-// are used for validation and aggregation of offers. The error
-// reporting scheme is also similar to TaskInfoValidator. However,
-// offer processing (and subsequent task processing) is aborted
-// altogether if offer validator reports an error.
-struct OfferValidator
-{
- virtual Option<Error> operator () (
- const OfferID& offerId,
- const Framework& framework,
- Master* master) = 0;
-
- virtual ~OfferValidator() {}
-
- Slave* getSlave(Master* master, const SlaveID& slaveId)
- {
- CHECK_NOTNULL(master);
- return master->getSlave(slaveId);
- }
-
- Offer* getOffer(Master* master, const OfferID& offerId)
- {
- CHECK_NOTNULL(master);
- return master->getOffer(offerId);
- }
-};
-
-
-// Validates the validity/liveness of an offer.
-struct ValidOfferValidator : OfferValidator
-{
- virtual Option<Error> operator () (
- const OfferID& offerId,
- const Framework& framework,
- Master* master)
- {
- Offer* offer = getOffer(master, offerId);
- if (offer == NULL) {
- return Error("Offer " + stringify(offerId) + " is no longer valid");
- }
-
- return None();
- }
-};
-
-
-// Validates that an offer belongs to the expected framework.
-struct FrameworkValidator : OfferValidator
-{
- virtual Option<Error> operator () (
- const OfferID& offerId,
- const Framework& framework,
- Master* master)
- {
- Offer* offer = getOffer(master, offerId);
- if (offer == NULL) {
- return Error("Offer " + stringify(offerId) + " is no longer valid");
- }
-
- if (!(framework.id == offer->framework_id())) {
- return Error(
- "Offer " + stringify(offer->id()) +
- " has invalid framework " + stringify(offer->framework_id()) +
- " while framework " + stringify(framework.id) + " is expected");
- }
-
- return None();
- }
-};
-
-
-// Validates that the slave is valid and ensures that all offers
-// belong to the same slave.
-struct SlaveValidator : OfferValidator
-{
- virtual Option<Error> operator () (
- const OfferID& offerId,
- const Framework& framework,
- Master* master)
- {
- Offer* offer = getOffer(master, offerId);
- if (offer == NULL) {
- return Error("Offer " + stringify(offerId) + " is no longer valid");
- }
-
- Slave* slave = getSlave(master, offer->slave_id());
-
- // This is not possible because the offer should've been removed.
- CHECK(slave != NULL)
- << "Offer " << offerId
- << " outlived slave " << offer->slave_id();
-
- // This is not possible because the offer should've been removed.
- CHECK(slave->connected)
- << "Offer " << offerId
- << " outlived disconnected slave " << *slave;
-
- if (slaveId.isNone()) {
- // Set slave id and use as base case for validation.
- slaveId = slave->id;
- } else if (!(slave->id == slaveId.get())) {
- return Error(
- "Aggregated offers must belong to one single slave. Offer " +
- stringify(offerId) + " uses slave " +
- stringify(slave->id) + " and slave " +
- stringify(slaveId.get()));
- }
-
- return None();
- }
-
- Option<const SlaveID> slaveId;
-};
-
-
-// Validates that an offer only appears once in offer list.
-struct UniqueOfferIDValidator : OfferValidator
-{
- virtual Option<Error> operator () (
- const OfferID& offerId,
- const Framework& framework,
- Master* master)
- {
- if (offers.contains(offerId)) {
- return Error("Duplicate offer " + stringify(offerId) + " in offer list");
- }
- offers.insert(offerId);
-
- return None();
- }
-
- hashset<OfferID> offers;
-};
-
-
void Master::launchTasks(
const UPID& from,
const FrameworkID& frameworkId,
@@ -2472,54 +1948,6 @@ void Master::launchTasks(
}
-Option<Error> Master::validateTask(
- const TaskInfo& task,
- Framework* framework,
- Slave* slave,
- const Resources& offeredResources,
- const Resources& usedResources)
-{
- CHECK_NOTNULL(framework);
- CHECK_NOTNULL(slave);
-
- // Create task validators.
- // NOTE: The order in which the following validators are executed
- // does matter! For example, ResourceUsageValidator assumes that
- // ExecutorInfo is valid which is verified by ExecutorInfoValidator.
- // TODO(vinod): Create the validators on the stack and make the
- // validate operation const.
- vector<Owned<TaskInfoValidator>> validators = {
- Owned<TaskInfoValidator>(new TaskIDValidator()),
- Owned<TaskInfoValidator>(new SlaveIDValidator()),
- Owned<TaskInfoValidator>(new UniqueTaskIDValidator()),
- Owned<TaskInfoValidator>(new CheckpointValidator()),
- Owned<TaskInfoValidator>(new ExecutorInfoValidator()),
- Owned<TaskInfoValidator>(new ResourceValidator()),
- Owned<TaskInfoValidator>(new ResourceUsageValidator())
- };
-
- // TODO(benh): Add a HealthCheckValidator.
-
- // TODO(jieyu): Add a CommandInfoValidator.
-
- // Invoke each validator.
- foreach (const Owned<TaskInfoValidator>& validator, validators) {
- Option<Error> error = (*validator)(
- task,
- *framework,
- *slave,
- offeredResources,
- usedResources);
-
- if (error.isSome()) {
- return Error(error.get().message);
- }
- }
-
- return None();
-}
-
-
Future<bool> Master::authorizeTask(
const TaskInfo& task,
Framework* framework)
@@ -2628,21 +2056,8 @@ void Master::accept(
if (accept.offer_ids().size() == 0) {
error = Error("No offers specified");
} else {
- vector<Owned<OfferValidator>> validators = {
- Owned<OfferValidator>(new ValidOfferValidator()),
- Owned<OfferValidator>(new FrameworkValidator()),
- Owned<OfferValidator>(new SlaveValidator()),
- Owned<OfferValidator>(new UniqueOfferIDValidator())
- };
-
// Validate the offers.
- foreach (const OfferID& offerId, accept.offer_ids()) {
- foreach (const Owned<OfferValidator>& validator, validators) {
- if (error.isNone()) {
- error = (*validator)(offerId, *framework, this);
- }
- }
- }
+ error = validation::offer::validate(accept.offer_ids(), this, framework);
// Compute offered resources and remove the offers. If the
// validation failed, return resources to the allocator.
@@ -2810,12 +2225,10 @@ void Master::_accept(
}
// Some offer operations update the offered resources. We keep
- // updated offered resources here.
+ // updated offered resources here. When a task is successfully
+ // launched, we remove its resource from offered resources.
Resources _offeredResources = offeredResources;
- // Accumulated resources used.
- Resources usedResources;
-
CHECK_READY(_authorizations);
list<Future<bool>> authorizations = _authorizations.get();
@@ -2898,12 +2311,11 @@ void Master::_accept(
}
// Validate the task.
- const Option<Error>& validationError = validateTask(
+ const Option<Error>& validationError = validation::task::validate(
task,
framework,
slave,
- _offeredResources,
- usedResources);
+ _offeredResources);
if (validationError.isSome()) {
const StatusUpdate& update = protobuf::createStatusUpdate(
@@ -2925,7 +2337,7 @@ void Master::_accept(
// Add task.
if (pending) {
- usedResources += addTask(task, framework, slave);
+ _offeredResources -= addTask(task, framework, slave);
// TODO(bmahler): Consider updating this log message to
// indicate when the executor is also being launched.
@@ -2959,15 +2371,12 @@ void Master::_accept(
}
}
- // Calculate unused resources.
- Resources unusedResources = _offeredResources - usedResources;
-
- if (!unusedResources.empty()) {
+ if (!_offeredResources.empty()) {
// Tell the allocator about the unused (e.g., refused) resources.
allocator->recoverResources(
frameworkId,
slaveId,
- unusedResources,
+ _offeredResources,
accept.filters());
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index aff35f2..cd37ee9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -56,6 +56,7 @@
#include "master/flags.hpp"
#include "master/metrics.hpp"
#include "master/registrar.hpp"
+#include "master/validation.hpp"
#include "messages/messages.hpp"
@@ -86,10 +87,10 @@ class Repairer;
class SlaveObserver;
struct Framework;
-struct OfferValidator;
struct Role;
struct Slave;
+
class Master : public ProtobufProcess<Master>
{
public:
@@ -362,16 +363,6 @@ protected:
const std::vector<StatusUpdate>& updates,
const process::Future<bool>& removed);
- // Validates the task.
- // Returns None if the task is valid.
- // Returns Error if the task is invalid.
- Option<Error> validateTask(
- const TaskInfo& task,
- Framework* framework,
- Slave* slave,
- const Resources& totalResources,
- const Resources& usedResources);
-
// Authorizes the task.
// Returns true if task is authorized.
// Returns false if task is not authorized.
@@ -510,9 +501,17 @@ private:
Master(const Master&); // No copying.
Master& operator = (const Master&); // No assigning.
- friend struct OfferValidator;
friend struct Metrics;
+ // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to
+ // make the following functions friends so that validation functions
+ // can get Offer* and Slave*.
+ friend Offer* validation::offer::getOffer(
+ Master* master, const OfferID& offerId);
+
+ friend Slave* validation::offer::getSlave(
+ Master* master, const SlaveID& slaveId);
+
const Flags flags;
Option<MasterInfo> leader; // Current leading master.
http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
new file mode 100644
index 0000000..8804ba6
--- /dev/null
+++ b/src/master/validation.cpp
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+#include <stout/none.hpp>
+#include <stout/stringify.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "master/master.hpp"
+#include "master/validation.hpp"
+
+using std::string;
+using std::vector;
+
+using google::protobuf::RepeatedPtrField;
+
+namespace mesos {
+namespace internal {
+namespace master {
+namespace validation {
+
+// A helper function which returns true if the given character is not
+// suitable for an ID.
+static bool invalid(char c)
+{
+ return iscntrl(c) || c == '/' || c == '\\';
+}
+
+namespace resource {
+
+// Validates the DiskInfos specified in the given resources (if
+// exist). Returns error if any DiskInfo is found invalid or
+// unsupported.
+Option<Error> validateDiskInfo(const RepeatedPtrField<Resource>& resources)
+{
+ foreach (const Resource& resource, resources) {
+ if (!resource.has_disk()) {
+ continue;
+ }
+
+ if (resource.disk().has_persistence()) {
+ if (resource.role() == "*") {
+ return Error("'*' role not supported for persistent volume");
+ }
+ if (!resource.disk().has_volume()) {
+ return Error("Expecting 'volume' to be set for persistent volume");
+ }
+ if (resource.disk().volume().mode() == Volume::RO) {
+ return Error("Read-only persistent volume not supported");
+ }
+ if (resource.disk().volume().has_host_path()) {
+ return Error("Expecting 'host_path' to be unset for persistent volume");
+ }
+
+ // Ensure persistence ID does not have invalid characters.
+ string id = resource.disk().persistence().id();
+ if (std::count_if(id.begin(), id.end(), invalid) > 0) {
+ return Error("Persistence ID '" + id + "' contains invalid characters");
+ }
+ } else if (resource.disk().has_volume()) {
+ return Error("Non-persistent volume not supported");
+ } else {
+ return Error("DiskInfo is set but empty");
+ }
+ }
+
+ return None();
+}
+
+
+// Validates the uniqueness of the persistence IDs used in the given
+// resources. They need to be unique per role on each slave.
+Option<Error> validateUniquePersistenceID(
+ const RepeatedPtrField<Resource>& resources)
+{
+ hashmap<string, hashset<string>> persistenceIds;
+
+ // Check duplicated persistence ID within the given resources.
+ foreach (const Resource& resource, resources) {
+ if (!resource.has_disk() || !resource.disk().has_persistence()) {
+ continue;
+ }
+
+ const string& role = resource.role();
+ const string& id = resource.disk().persistence().id();
+
+ if (persistenceIds.contains(role) &&
+ persistenceIds[role].contains(id)) {
+ return Error("Persistence ID '" + id + "' is not unique");
+ }
+
+ persistenceIds[role].insert(id);
+ }
+
+ return None();
+}
+
+
+Option<Error> validate(const RepeatedPtrField<Resource>& resources)
+{
+ Option<Error> error = Resources::validate(resources);
+ if (error.isSome()) {
+ return Error("Invalid resources: " + error.get().message);
+ }
+
+ error = validateDiskInfo(resources);
+ if (error.isSome()) {
+ return Error("Invalid DiskInfo: " + error.get().message);
+ }
+
+ return None();
+}
+
+} // namespace resource {
+
+
+namespace task {
+
+// Validates that a task id is valid, i.e., contains only valid
+// characters.
+Option<Error> validateTaskID(const TaskInfo& task)
+{
+ const string& id = task.task_id().value();
+
+ if (std::count_if(id.begin(), id.end(), invalid) > 0) {
+ return Error("TaskID '" + id + "' contains invalid characters");
+ }
+
+ return None();
+}
+
+
+// Validates that the TaskID does not collide with any existing tasks
+// for the framework.
+Option<Error> validateUniqueTaskID(const TaskInfo& task, Framework*
+ framework)
+{
+ const TaskID& taskId = task.task_id();
+
+ if (framework->tasks.contains(taskId)) {
+ return Error("Task has duplicate ID: " + taskId.value());
+ }
+
+ return None();
+}
+
+
+// Validates that the slave ID used by a task is correct.
+Option<Error> validateSlaveID(const TaskInfo& task, Slave* slave)
+{
+ if (task.slave_id() != slave->id) {
+ return Error(
+ "Task uses invalid slave " + task.slave_id().value() +
+ " while slave " + slave->id.value() + " is expected");
+ }
+
+ return None();
+}
+
+
+// Validates that tasks that use the "same" executor (i.e., same
+// ExecutorID) have an identical ExecutorInfo.
+Option<Error> validateExecutorInfo(
+ const TaskInfo& task, Framework* framework, Slave* slave)
+{
+ if (task.has_executor() == task.has_command()) {
+ return Error(
+ "Task should have at least one (but not both) of CommandInfo or "
+ "ExecutorInfo present");
+ }
+
+ if (task.has_executor()) {
+ // The master currently expects ExecutorInfo.framework_id to be
+ // set even though it is an optional field. Currently, the
+ // scheduler driver ensures that the field is set. For schedulers
+ // not using the driver, we need to do the validation here.
+ // TODO(bmahler): Set this field in the master instead of
+ // depending on the scheduler driver do it.
+ if (!task.executor().has_framework_id()) {
+ return Error(
+ "Task has invalid ExecutorInfo: missing field 'framework_id'");
+ }
+
+ if (task.executor().framework_id() != framework->id) {
+ return Error(
+ "ExecutorInfo has an invalid FrameworkID"
+ " (Actual: " + stringify(task.executor().framework_id()) +
+ " vs Expected: " + stringify(framework->id) + ")");
+ }
+
+ const ExecutorID& executorId = task.executor().executor_id();
+ Option<ExecutorInfo> executorInfo = None();
+
+ if (slave->hasExecutor(framework->id, executorId)) {
+ executorInfo = slave->executors.get(framework->id).get().get(executorId);
+ }
+
+ if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
+ return Error(
+ "Task has invalid ExecutorInfo (existing ExecutorInfo"
+ " with same ExecutorID is not compatible).\n"
+ "------------------------------------------------------------\n"
+ "Existing ExecutorInfo:\n" +
+ stringify(executorInfo.get()) + "\n"
+ "------------------------------------------------------------\n"
+ "Task's ExecutorInfo:\n" +
+ stringify(task.executor()) + "\n"
+ "------------------------------------------------------------\n");
+ }
+ }
+
+ return None();
+}
+
+
+// Validates that a task that asks for checkpointing is not being
+// launched on a slave that has not enabled checkpointing.
+// TODO(jieyu): Remove this in favor of a CHECK, because the allocator
+// should filter these out.
+Option<Error> validateCheckpoint(Framework* framework, Slave* slave)
+{
+ if (framework->info.checkpoint() && !slave->info.checkpoint()) {
+ return Error(
+ "Task asked to be checkpointed but slave " +
+ stringify(slave->id) + " has checkpointing disabled");
+ }
+
+ return None();
+}
+
+// Validates that the resources specified by the framework are valid.
+Option<Error> validateResources(const TaskInfo& task)
+{
+ Option<Error> error = resource::validate(task.resources());
+ if (error.isSome()) {
+ return Error("Task uses invalid resources: " + error.get().message);
+ }
+
+ Resources total = task.resources();
+
+ if (task.has_executor()) {
+ error = resource::validate(task.executor().resources());
+ if (error.isSome()) {
+ return Error("Executor uses invalid resources: " + error.get().message);
+ }
+
+ total += task.executor().resources();
+ }
+
+ error = resource::validateUniquePersistenceID(total);
+ if (error.isSome()) {
+ return error;
+ }
+
+ return None();
+}
+
+
+// Validates that the task and the executor are using proper amount of
+// resources. For instance, the used resources by a task on each slave
+// should not exceed the total resources offered on that slave.
+Option<Error> validateResourceUsage(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave,
+ const Resources& offered)
+{
+ Resources taskResources = task.resources();
+
+ if (taskResources.empty()) {
+ return Error("Task uses no resources");
+ }
+
+ Resources executorResources;
+ if (task.has_executor()) {
+ executorResources = task.executor().resources();
+ }
+
+ // Validate minimal cpus and memory resources of executor and log
+ // warnings if not set.
+ if (task.has_executor()) {
+ // TODO(martin): MESOS-1807. Return Error instead of logging a
+ // warning in 0.22.0.
+ Option<double> cpus = executorResources.cpus();
+ if (cpus.isNone() || cpus.get() < MIN_CPUS) {
+ LOG(WARNING)
+ << "Executor " << stringify(task.executor().executor_id())
+ << " for task " << stringify(task.task_id())
+ << " uses less CPUs ("
+ << (cpus.isSome() ? stringify(cpus.get()) : "None")
+ << ") than the minimum required (" << MIN_CPUS
+ << "). Please update your executor, as this will be mandatory "
+ << "in future releases.";
+ }
+
+ Option<Bytes> mem = executorResources.mem();
+ if (mem.isNone() || mem.get() < MIN_MEM) {
+ LOG(WARNING)
+ << "Executor " << stringify(task.executor().executor_id())
+ << " for task " << stringify(task.task_id())
+ << " uses less memory ("
+ << (mem.isSome() ? stringify(mem.get().megabytes()) : "None")
+ << ") than the minimum required (" << MIN_MEM
+ << "). Please update your executor, as this will be mandatory "
+ << "in future releases.";
+ }
+ }
+
+ // Validate if resources needed by the task (and its executor in
+ // case the executor is new) are available.
+ Resources total = taskResources;
+ if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
+ total += executorResources;
+ }
+
+ if (!offered.contains(total)) {
+ return Error(
+ "Task uses more resources " + stringify(total) +
+ " than available " + stringify(offered));
+ }
+
+ return None();
+}
+
+
+Option<Error> validate(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave,
+ const Resources& offered)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+
+ // NOTE: The order in which the following validate functions are
+ // executed does matter! For example, 'validateResourceUsage'
+ // assumes that ExecutorInfo is valid which is verified by
+ // 'validateExecutorInfo'.
+ vector<lambda::function<Option<Error>(void)>> validators = {
+ lambda::bind(validateTaskID, task),
+ lambda::bind(validateUniqueTaskID, task, framework),
+ lambda::bind(validateSlaveID, task, slave),
+ lambda::bind(validateExecutorInfo, task, framework, slave),
+ lambda::bind(validateCheckpoint, framework, slave),
+ lambda::bind(validateResources, task),
+ lambda::bind(validateResourceUsage, task, framework, slave, offered)
+ };
+
+ // TODO(benh): Add a validateHealthCheck function.
+
+ // TODO(jieyu): Add a validateCommandInfo function.
+
+ foreach (const lambda::function<Option<Error>(void)>& validator, validators) {
+ Option<Error> error = validator();
+ if (error.isSome()) {
+ return error;
+ }
+ }
+
+ return None();
+}
+
+} // namespace task {
+
+
+namespace offer {
+
+Offer* getOffer(Master* master, const OfferID& offerId)
+{
+ CHECK_NOTNULL(master);
+ return master->getOffer(offerId);
+}
+
+
+Slave* getSlave(Master* master, const SlaveID& slaveId)
+{
+ CHECK_NOTNULL(master);
+ return master->getSlave(slaveId);
+}
+
+
+// Validates that an offer only appears once in offer list.
+Option<Error> validateUniqueOfferID(const RepeatedPtrField<OfferID>& offerIds)
+{
+ hashset<OfferID> offers;
+
+ foreach (const OfferID& offerId, offerIds) {
+ if (offers.contains(offerId)) {
+ return Error("Duplicate offer " + stringify(offerId) + " in offer list");
+ }
+
+ offers.insert(offerId);
+ }
+
+ return None();
+}
+
+
+// Validates that all offers belongs to the expected framework.
+Option<Error> validateFramework(
+ const RepeatedPtrField<OfferID>& offerIds,
+ Master* master,
+ Framework* framework)
+{
+ foreach (const OfferID& offerId, offerIds) {
+ Offer* offer = getOffer(master, offerId);
+ if (offer == NULL) {
+ return Error("Offer " + stringify(offerId) + " is no longer valid");
+ }
+
+ if (framework->id != offer->framework_id()) {
+ return Error(
+ "Offer " + stringify(offer->id()) +
+ " has invalid framework " + stringify(offer->framework_id()) +
+ " while framework " + stringify(framework->id) + " is expected");
+ }
+ }
+
+ return None();
+}
+
+
+// Validates that all offers belong to the same valid slave.
+Option<Error> validateSlave(
+ const RepeatedPtrField<OfferID>& offerIds, Master* master)
+{
+ Option<SlaveID> slaveId;
+
+ foreach (const OfferID& offerId, offerIds) {
+ Offer* offer = getOffer(master, offerId);
+ if (offer == NULL) {
+ return Error("Offer " + stringify(offerId) + " is no longer valid");
+ }
+
+ Slave* slave = getSlave(master, offer->slave_id());
+
+ // This is not possible because the offer should've been removed.
+ CHECK(slave != NULL)
+ << "Offer " << offerId
+ << " outlived slave " << offer->slave_id();
+
+ // This is not possible because the offer should've been removed.
+ CHECK(slave->connected)
+ << "Offer " << offerId
+ << " outlived disconnected slave " << *slave;
+
+ if (slaveId.isNone()) {
+ // Set slave id and use as base case for validation.
+ slaveId = slave->id;
+ }
+
+ if (slave->id != slaveId.get()) {
+ return Error(
+ "Aggregated offers must belong to one single slave. Offer " +
+ stringify(offerId) + " uses slave " +
+ stringify(slave->id) + " and slave " +
+ stringify(slaveId.get()));
+ }
+ }
+
+ return None();
+}
+
+
+Option<Error> validate(
+ const RepeatedPtrField<OfferID>& offerIds,
+ Master* master,
+ Framework* framework)
+{
+ CHECK_NOTNULL(master);
+ CHECK_NOTNULL(framework);
+
+ vector<lambda::function<Option<Error>(void)>> validators = {
+ lambda::bind(validateUniqueOfferID, offerIds),
+ lambda::bind(validateFramework, offerIds, master, framework),
+ lambda::bind(validateSlave, offerIds, master)
+ };
+
+ foreach (const lambda::function<Option<Error>(void)>& validator, validators) {
+ Option<Error> error = validator();
+ if (error.isSome()) {
+ return error;
+ }
+ }
+
+ return None();
+}
+
+} // namespace offer {
+
+
+namespace operation {
+
+// TODO(jieyu): Added validate functions for Offer operations.
+
+} // namespace operation {
+
+} // namespace validation {
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
new file mode 100644
index 0000000..642c375
--- /dev/null
+++ b/src/master/validation.hpp
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <google/protobuf/repeated_field.h>
+
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <stout/error.hpp>
+#include <stout/option.hpp>
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+class Master;
+
+struct Framework;
+struct Slave;
+
+namespace validation {
+
+namespace resource {
+
+// Validates resources specified by frameworks.
+// NOTE: We cannot take 'Resources' here because invalid resources are
+// silently ignored within its constructor.
+Option<Error> validate(
+ const google::protobuf::RepeatedPtrField<Resource>& resources);
+
+} // namespace resource {
+
+
+namespace task {
+
+// Validates a task that a framework attempts to launch within the
+// offered resources. Returns an optional error which will cause the
+// master to send a failed status update back to the framework.
+// NOTE: This function must be called sequentially for each task, and
+// each task needs to be launched before the next can be validated.
+Option<Error> validate(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave,
+ const Resources& offered);
+
+} // namespace task {
+
+
+namespace offer {
+
+// NOTE: These two functions are placed in the header file because we
+// need to declare them as friends of Master.
+Offer* getOffer(Master* master, const OfferID& offerId);
+Slave* getSlave(Master* master, const SlaveID& slaveId);
+
+
+// Validates the given offers.
+Option<Error> validate(
+ const google::protobuf::RepeatedPtrField<OfferID>& offerIds,
+ Master* master,
+ Framework* framework);
+
+} // namespace offer {
+
+
+namespace operation {
+
+// TODO(jieyu): Added validate functions for Offer operations.
+
+} // namespace operation {
+
+} // namespace validation {
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {