You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/03 19:58:49 UTC

mesos git commit: Refactored task/offer/resource valiation in master.

Repository: mesos
Updated Branches:
  refs/heads/master 01d391b4a -> b22d7addb


Refactored task/offer/resource valiation in master.

Review: https://reviews.apache.org/r/30459


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b22d7add
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b22d7add
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b22d7add

Branch: refs/heads/master
Commit: b22d7addbc03dfe4a5aa63a05e4f805b1c15631d
Parents: 01d391b
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jan 30 11:12:13 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 3 10:57:45 2015 -0800

----------------------------------------------------------------------
 src/Makefile.am           |   2 +
 src/common/type_utils.hpp |  28 +-
 src/master/master.cpp     | 607 +----------------------------------------
 src/master/master.hpp     |  23 +-
 src/master/validation.cpp | 526 +++++++++++++++++++++++++++++++++++
 src/master/validation.hpp |  91 ++++++
 6 files changed, 658 insertions(+), 619 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 07bea1f..5b1885d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -295,6 +295,7 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	master/registry.proto						\
 	master/registrar.cpp						\
 	master/repairer.cpp						\
+	master/validation.cpp						\
 	module/manager.cpp						\
 	sched/constants.cpp						\
 	sched/sched.cpp							\
@@ -463,6 +464,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	master/repairer.hpp						\
 	master/registrar.hpp						\
 	master/sorter.hpp						\
+	master/validation.hpp						\
 	messages/messages.hpp						\
 	module/authenticatee.hpp					\
 	module/authenticator.hpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/common/type_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.hpp b/src/common/type_utils.hpp
index 6809a12..b23eeb8 100644
--- a/src/common/type_utils.hpp
+++ b/src/common/type_utils.hpp
@@ -55,14 +55,6 @@ inline bool operator == (const ExecutorID& left, const ExecutorID& right)
 }
 
 
-inline std::ostream& operator << (
-    std::ostream& stream,
-    const ContainerInfo& containerInfo)
-{
-  return stream << containerInfo.DebugString();
-}
-
-
 inline bool operator == (const FrameworkID& left, const FrameworkID& right)
 {
   return left.value() == right.value();
@@ -135,6 +127,18 @@ inline bool operator != (const ContainerID& left, const ContainerID& right)
 }
 
 
+inline bool operator != (const FrameworkID& left, const FrameworkID& right)
+{
+  return left.value() != right.value();
+}
+
+
+inline bool operator != (const SlaveID& left, const SlaveID& right)
+{
+  return left.value() != right.value();
+}
+
+
 inline bool operator < (const ContainerID& left, const ContainerID& right)
 {
   return left.value() < right.value();
@@ -237,6 +241,14 @@ inline std::ostream& operator << (
 
 inline std::ostream& operator << (
     std::ostream& stream,
+    const ContainerInfo& containerInfo)
+{
+  return stream << containerInfo.DebugString();
+}
+
+
+inline std::ostream& operator << (
+    std::ostream& stream,
     const ExecutorID& executorId)
 {
   return stream << executorId.value();

http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e737fcb..d04b2c4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1896,530 +1896,6 @@ void Master::resourceRequest(
 }
 
 
-struct ResourcesValidator
-{
-  virtual Option<Error> operator () (
-      const Resources& resources,
-      const Slave& slave) = 0;
-
-  virtual ~ResourcesValidator() {}
-};
-
-
-struct DiskInfoValidator : ResourcesValidator
-{
-  virtual Option<Error> operator () (
-      const Resources& resources,
-      const Slave& slave)
-  {
-    foreach (const Resource& resource, resources) {
-      if (!resource.has_disk()) {
-        continue;
-      }
-
-      if (resource.disk().has_persistence()) {
-        if (resource.role() == "*") {
-          return Error(
-              "Invalid DiskInfo: '*' role not "
-              "supported for persistent volume");
-        }
-        if (!resource.disk().has_volume()) {
-          return Error(
-              "Invalid DiskInfo: expecting 'volume' "
-              "to be set for persistent volume");
-        }
-        if (resource.disk().volume().mode() == Volume::RO) {
-          return Error(
-              "Invalid DiskInfo: read-only persistent "
-              "volume not supported");
-        }
-        if (resource.disk().volume().has_host_path()) {
-          return Error(
-              "Invalid DiskInfo: expecting 'host_path' "
-              "to be unset for persistent volume");
-        }
-
-        // Ensure persistence ID does not have invalid characters.
-        string id = resource.disk().persistence().id();
-        if (std::count_if(id.begin(), id.end(), invalid) > 0) {
-          return Error(
-              "Invalid DiskInfo: persistence ID '" + id +
-              "' contains invalid characters");
-        }
-      } else if (resource.disk().has_volume()) {
-        return Error("Invalid DiskInfo: non-persistent volume not supported");
-      } else {
-        return Error("Invalid DiskInfo: empty");
-      }
-    }
-
-    return None();
-  }
-
-  static bool invalid(char c)
-  {
-    return iscntrl(c) || c == '/' || c == '\\';
-  }
-};
-
-
-struct UniquePersistenceIDValidator : ResourcesValidator
-{
-  virtual Option<Error> operator () (
-      const Resources& resources,
-      const Slave& slave)
-  {
-    hashmap<string, hashset<string>> persistenceIds;
-
-    // Check duplicated persistence ID within the given resources.
-    foreach (const Resource& resource, resources) {
-      if (!resource.has_disk() || !resource.disk().has_persistence()) {
-        continue;
-      }
-
-      const string& role = resource.role();
-      const string& id = resource.disk().persistence().id();
-
-      if (persistenceIds.contains(role) &&
-          persistenceIds[role].contains(id)) {
-        return Error("Persistence ID '" + id + "' is not unique");
-      }
-
-      persistenceIds[role].insert(id);
-    }
-
-    return None();
-  }
-};
-
-
-// Abstraction for performing any validations, aggregations, etc. of
-// tasks that a framework attempts to run within the resources
-// provided by offers. A validator can return an optional error which
-// will cause the master to send a failed status update back to the
-// framework for only that task. An instance will be reused for each
-// task from same 'launchTasks()', but not for task from different
-// offers.
-struct TaskInfoValidator
-{
-  virtual Option<Error> operator () (
-      const TaskInfo& task,
-      const Framework& framework,
-      const Slave& slave,
-      const Resources& offeredResources,
-      const Resources& usedResources) = 0;
-
-  virtual ~TaskInfoValidator() {}
-};
-
-
-// Validates that a task id is valid, i.e., contains only valid
-// characters.
-struct TaskIDValidator : TaskInfoValidator
-{
-  virtual Option<Error> operator () (
-      const TaskInfo& task,
-      const Framework& framework,
-      const Slave& slave,
-      const Resources& offeredResources,
-      const Resources& usedResources)
-  {
-    const string& id = task.task_id().value();
-
-    if (std::count_if(id.begin(), id.end(), invalid) > 0) {
-      return Error("TaskID '" + id + "' contains invalid characters");
-    }
-
-    return None();
-  }
-
-  static bool invalid(char c)
-  {
-    return iscntrl(c) || c == '/' || c == '\\';
-  }
-};
-
-
-// Validates that the slave ID used by a task is correct.
-struct SlaveIDValidator : TaskInfoValidator
-{
-  virtual Option<Error> operator () (
-      const TaskInfo& task,
-      const Framework& framework,
-      const Slave& slave,
-      const Resources& offeredResources,
-      const Resources& usedResources)
-  {
-    if (!(task.slave_id() == slave.id)) {
-      return Error(
-          "Task uses invalid slave " + task.slave_id().value() +
-          " while slave " + slave.id.value() + " is expected");
-    }
-
-    return None();
-  }
-};
-
-
-// Validates that each task uses a unique ID. Regardless of whether a
-// task actually gets launched (for example, another validator may
-// return an error for a task), we always consider it an error when a
-// task tries to re-use an ID.
-struct UniqueTaskIDValidator : TaskInfoValidator
-{
-  virtual Option<Error> operator () (
-      const TaskInfo& task,
-      const Framework& framework,
-      const Slave& slave,
-      const Resources& offeredResources,
-      const Resources& usedResources)
-  {
-    const TaskID& taskId = task.task_id();
-
-    if (framework.tasks.contains(taskId)) {
-      return Error("Task has duplicate ID: " + taskId.value());
-    }
-
-    return None();
-  }
-};
-
-
-// Validates that resources specified by the framework are valid.
-struct ResourceValidator : TaskInfoValidator
-{
-  virtual Option<Error> operator () (
-      const TaskInfo& task,
-      const Framework& framework,
-      const Slave& slave,
-      const Resources& offeredResources,
-      const Resources& usedResources)
-  {
-    Option<Error> error = Resources::validate(task.resources());
-    if (error.isSome()) {
-      return Error("Task uses invalid resources: " + error.get().message);
-    }
-
-    Resources resources = task.resources();
-
-    if (task.has_executor()) {
-      Option<Error> error = Resources::validate(task.executor().resources());
-      if (error.isSome()) {
-        return Error(
-            "Executor uses invalid resources: " + error.get().message);
-      }
-
-      resources += task.executor().resources();
-    }
-
-    vector<Owned<ResourcesValidator>> validators = {
-      Owned<ResourcesValidator>(new DiskInfoValidator()),
-      Owned<ResourcesValidator>(new UniquePersistenceIDValidator())
-    };
-
-    foreach (const Owned<ResourcesValidator>& validator, validators) {
-      error = (*validator)(resources, slave);
-      if (error.isSome()) {
-        return Error(error.get().message);
-      }
-    }
-
-    return None();
-  }
-};
-
-
-// Validates that the task and the executor are using proper amount of
-// resources. For instance, the used resources by a task on each slave
-// should not exceed the total resources offered on that slave.
-struct ResourceUsageValidator : TaskInfoValidator
-{
-  virtual Option<Error> operator () (
-      const TaskInfo& task,
-      const Framework& framework,
-      const Slave& slave,
-      const Resources& offeredResources,
-      const Resources& usedResources)
-  {
-    Resources taskResources = task.resources();
-
-    if (taskResources.empty()) {
-      return Error("Task uses no resources");
-    }
-
-    Resources executorResources;
-    if (task.has_executor()) {
-      executorResources = task.executor().resources();
-    }
-
-    // Validate minimal cpus and memory resources of executor and log
-    // warnings if not set.
-    if (task.has_executor()) {
-      // TODO(martin): MESOS-1807. Return Error instead of logging a
-      // warning in 0.22.0.
-      Option<double> cpus =  executorResources.cpus();
-      if (cpus.isNone() || cpus.get() < MIN_CPUS) {
-        LOG(WARNING)
-          << "Executor " << stringify(task.executor().executor_id())
-          << " for task " << stringify(task.task_id())
-          << " uses less CPUs ("
-          << (cpus.isSome() ? stringify(cpus.get()) : "None")
-          << ") than the minimum required (" << MIN_CPUS
-          << "). Please update your executor, as this will be mandatory "
-          << "in future releases.";
-      }
-
-      Option<Bytes> mem = executorResources.mem();
-      if (mem.isNone() || mem.get() < MIN_MEM) {
-        LOG(WARNING)
-          << "Executor " << stringify(task.executor().executor_id())
-          << " for task " << stringify(task.task_id())
-          << " uses less memory ("
-          << (mem.isSome() ? stringify(mem.get().megabytes()) : "None")
-          << ") than the minimum required (" << MIN_MEM
-          << "). Please update your executor, as this will be mandatory "
-          << "in future releases.";
-      }
-    }
-
-    // Validate if resources needed by the task (and its executor in
-    // case the executor is new) are available.
-    Resources resources = taskResources;
-    if (!slave.hasExecutor(framework.id, task.executor().executor_id())) {
-      resources += executorResources;
-    }
-
-    if (!offeredResources.contains(resources + usedResources)) {
-      return Error(
-          "Task uses more resources " + stringify(resources) +
-          " than available " + stringify(offeredResources - usedResources));
-    }
-
-    return None();
-  }
-};
-
-
-// Validates that tasks that use the "same" executor (i.e., same
-// ExecutorID) have an identical ExecutorInfo.
-struct ExecutorInfoValidator : TaskInfoValidator
-{
-  virtual Option<Error> operator () (
-      const TaskInfo& task,
-      const Framework& framework,
-      const Slave& slave,
-      const Resources& offeredResources,
-      const Resources& usedResources)
-  {
-    if (task.has_executor() == task.has_command()) {
-      return Error(
-          "Task should have at least one (but not both) of CommandInfo or "
-          "ExecutorInfo present");
-    }
-
-    if (task.has_executor()) {
-      // The master currently expects ExecutorInfo.framework_id to be
-      // set even though it is an optional field. Currently, the
-      // scheduler driver ensures that the field is set. For
-      // schedulers not using the driver, we need to do the validation
-      // here.
-      // TODO(bmahler): Set this field in the master instead of
-      // depending on the scheduler driver do it.
-      if (!task.executor().has_framework_id()) {
-        return Error(
-            "Task has invalid ExecutorInfo: missing field 'framework_id'");
-      }
-
-      if (!(task.executor().framework_id() == framework.id)) {
-        return Error(
-            "ExecutorInfo has an invalid FrameworkID"
-            " (Actual: " + stringify(task.executor().framework_id()) +
-            " vs Expected: " + stringify(framework.id) + ")");
-      }
-
-      const ExecutorID& executorId = task.executor().executor_id();
-      Option<ExecutorInfo> executorInfo = None();
-
-      if (slave.hasExecutor(framework.id, executorId)) {
-        executorInfo = slave.executors.get(framework.id).get().get(executorId);
-      }
-
-      if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
-        return Error(
-            "Task has invalid ExecutorInfo (existing ExecutorInfo"
-            " with same ExecutorID is not compatible).\n"
-            "------------------------------------------------------------\n"
-            "Existing ExecutorInfo:\n" +
-            stringify(executorInfo.get()) + "\n"
-            "------------------------------------------------------------\n"
-            "Task's ExecutorInfo:\n" +
-            stringify(task.executor()) + "\n"
-            "------------------------------------------------------------\n");
-      }
-    }
-
-    return None();
-  }
-};
-
-
-// Validates that a task that asks for checkpointing is not being
-// launched on a slave that has not enabled checkpointing.
-struct CheckpointValidator : TaskInfoValidator
-{
-  virtual Option<Error> operator () (
-      const TaskInfo& task,
-      const Framework& framework,
-      const Slave& slave,
-      const Resources& offeredResources,
-      const Resources& usedResources)
-  {
-    if (framework.info.checkpoint() && !slave.info.checkpoint()) {
-      return Error(
-          "Task asked to be checkpointed but slave " +
-          stringify(slave.id) + " has checkpointing disabled");
-    }
-
-    return None();
-  }
-};
-
-
-// OfferValidators are similar to the TaskInfoValidator pattern and
-// are used for validation and aggregation of offers. The error
-// reporting scheme is also similar to TaskInfoValidator. However,
-// offer processing (and subsequent task processing) is aborted
-// altogether if offer validator reports an error.
-struct OfferValidator
-{
-  virtual Option<Error> operator () (
-      const OfferID& offerId,
-      const Framework& framework,
-      Master* master) = 0;
-
-  virtual ~OfferValidator() {}
-
-  Slave* getSlave(Master* master, const SlaveID& slaveId)
-  {
-    CHECK_NOTNULL(master);
-    return master->getSlave(slaveId);
-  }
-
-  Offer* getOffer(Master* master, const OfferID& offerId)
-  {
-    CHECK_NOTNULL(master);
-    return master->getOffer(offerId);
-  }
-};
-
-
-// Validates the validity/liveness of an offer.
-struct ValidOfferValidator : OfferValidator
-{
-  virtual Option<Error> operator () (
-      const OfferID& offerId,
-      const Framework& framework,
-      Master* master)
-  {
-    Offer* offer = getOffer(master, offerId);
-    if (offer == NULL) {
-      return Error("Offer " + stringify(offerId) + " is no longer valid");
-    }
-
-    return None();
-  }
-};
-
-
-// Validates that an offer belongs to the expected framework.
-struct FrameworkValidator : OfferValidator
-{
-  virtual Option<Error> operator () (
-      const OfferID& offerId,
-      const Framework& framework,
-      Master* master)
-  {
-    Offer* offer = getOffer(master, offerId);
-    if (offer == NULL) {
-      return Error("Offer " + stringify(offerId) + " is no longer valid");
-    }
-
-    if (!(framework.id == offer->framework_id())) {
-      return Error(
-          "Offer " + stringify(offer->id()) +
-          " has invalid framework " + stringify(offer->framework_id()) +
-          " while framework " + stringify(framework.id) + " is expected");
-    }
-
-    return None();
-  }
-};
-
-
-// Validates that the slave is valid and ensures that all offers
-// belong to the same slave.
-struct SlaveValidator : OfferValidator
-{
-  virtual Option<Error> operator () (
-      const OfferID& offerId,
-      const Framework& framework,
-      Master* master)
-  {
-    Offer* offer = getOffer(master, offerId);
-    if (offer == NULL) {
-      return Error("Offer " + stringify(offerId) + " is no longer valid");
-    }
-
-    Slave* slave = getSlave(master, offer->slave_id());
-
-    // This is not possible because the offer should've been removed.
-    CHECK(slave != NULL)
-      << "Offer " << offerId
-      << " outlived slave " << offer->slave_id();
-
-    // This is not possible because the offer should've been removed.
-    CHECK(slave->connected)
-      << "Offer " << offerId
-      << " outlived disconnected slave " << *slave;
-
-    if (slaveId.isNone()) {
-      // Set slave id and use as base case for validation.
-      slaveId = slave->id;
-    } else if (!(slave->id == slaveId.get())) {
-      return Error(
-          "Aggregated offers must belong to one single slave. Offer " +
-          stringify(offerId) + " uses slave " +
-          stringify(slave->id) + " and slave " +
-          stringify(slaveId.get()));
-    }
-
-    return None();
-  }
-
-  Option<const SlaveID> slaveId;
-};
-
-
-// Validates that an offer only appears once in offer list.
-struct UniqueOfferIDValidator : OfferValidator
-{
-  virtual Option<Error> operator () (
-      const OfferID& offerId,
-      const Framework& framework,
-      Master* master)
-  {
-    if (offers.contains(offerId)) {
-      return Error("Duplicate offer " + stringify(offerId) + " in offer list");
-    }
-    offers.insert(offerId);
-
-    return None();
-  }
-
-  hashset<OfferID> offers;
-};
-
-
 void Master::launchTasks(
     const UPID& from,
     const FrameworkID& frameworkId,
@@ -2472,54 +1948,6 @@ void Master::launchTasks(
 }
 
 
-Option<Error> Master::validateTask(
-    const TaskInfo& task,
-    Framework* framework,
-    Slave* slave,
-    const Resources& offeredResources,
-    const Resources& usedResources)
-{
-  CHECK_NOTNULL(framework);
-  CHECK_NOTNULL(slave);
-
-  // Create task validators.
-  // NOTE: The order in which the following validators are executed
-  // does matter! For example, ResourceUsageValidator assumes that
-  // ExecutorInfo is valid which is verified by ExecutorInfoValidator.
-  // TODO(vinod): Create the validators on the stack and make the
-  // validate operation const.
-  vector<Owned<TaskInfoValidator>> validators = {
-    Owned<TaskInfoValidator>(new TaskIDValidator()),
-    Owned<TaskInfoValidator>(new SlaveIDValidator()),
-    Owned<TaskInfoValidator>(new UniqueTaskIDValidator()),
-    Owned<TaskInfoValidator>(new CheckpointValidator()),
-    Owned<TaskInfoValidator>(new ExecutorInfoValidator()),
-    Owned<TaskInfoValidator>(new ResourceValidator()),
-    Owned<TaskInfoValidator>(new ResourceUsageValidator())
-  };
-
-  // TODO(benh): Add a HealthCheckValidator.
-
-  // TODO(jieyu): Add a CommandInfoValidator.
-
-  // Invoke each validator.
-  foreach (const Owned<TaskInfoValidator>& validator, validators) {
-    Option<Error> error = (*validator)(
-        task,
-        *framework,
-        *slave,
-        offeredResources,
-        usedResources);
-
-    if (error.isSome()) {
-      return Error(error.get().message);
-    }
-  }
-
-  return None();
-}
-
-
 Future<bool> Master::authorizeTask(
     const TaskInfo& task,
     Framework* framework)
@@ -2628,21 +2056,8 @@ void Master::accept(
   if (accept.offer_ids().size() == 0) {
     error = Error("No offers specified");
   } else {
-    vector<Owned<OfferValidator>> validators = {
-      Owned<OfferValidator>(new ValidOfferValidator()),
-      Owned<OfferValidator>(new FrameworkValidator()),
-      Owned<OfferValidator>(new SlaveValidator()),
-      Owned<OfferValidator>(new UniqueOfferIDValidator())
-    };
-
     // Validate the offers.
-    foreach (const OfferID& offerId, accept.offer_ids()) {
-      foreach (const Owned<OfferValidator>& validator, validators) {
-        if (error.isNone()) {
-          error = (*validator)(offerId, *framework, this);
-        }
-      }
-    }
+    error = validation::offer::validate(accept.offer_ids(), this, framework);
 
     // Compute offered resources and remove the offers. If the
     // validation failed, return resources to the allocator.
@@ -2810,12 +2225,10 @@ void Master::_accept(
   }
 
   // Some offer operations update the offered resources. We keep
-  // updated offered resources here.
+  // updated offered resources here. When a task is successfully
+  // launched, we remove its resource from offered resources.
   Resources _offeredResources = offeredResources;
 
-  // Accumulated resources used.
-  Resources usedResources;
-
   CHECK_READY(_authorizations);
   list<Future<bool>> authorizations = _authorizations.get();
 
@@ -2898,12 +2311,11 @@ void Master::_accept(
           }
 
           // Validate the task.
-          const Option<Error>& validationError = validateTask(
+          const Option<Error>& validationError = validation::task::validate(
               task,
               framework,
               slave,
-              _offeredResources,
-              usedResources);
+              _offeredResources);
 
           if (validationError.isSome()) {
             const StatusUpdate& update = protobuf::createStatusUpdate(
@@ -2925,7 +2337,7 @@ void Master::_accept(
 
           // Add task.
           if (pending) {
-            usedResources += addTask(task, framework, slave);
+            _offeredResources -= addTask(task, framework, slave);
 
             // TODO(bmahler): Consider updating this log message to
             // indicate when the executor is also being launched.
@@ -2959,15 +2371,12 @@ void Master::_accept(
     }
   }
 
-  // Calculate unused resources.
-  Resources unusedResources = _offeredResources - usedResources;
-
-  if (!unusedResources.empty()) {
+  if (!_offeredResources.empty()) {
     // Tell the allocator about the unused (e.g., refused) resources.
     allocator->recoverResources(
         frameworkId,
         slaveId,
-        unusedResources,
+        _offeredResources,
         accept.filters());
   }
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index aff35f2..cd37ee9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -56,6 +56,7 @@
 #include "master/flags.hpp"
 #include "master/metrics.hpp"
 #include "master/registrar.hpp"
+#include "master/validation.hpp"
 
 #include "messages/messages.hpp"
 
@@ -86,10 +87,10 @@ class Repairer;
 class SlaveObserver;
 
 struct Framework;
-struct OfferValidator;
 struct Role;
 struct Slave;
 
+
 class Master : public ProtobufProcess<Master>
 {
 public:
@@ -362,16 +363,6 @@ protected:
       const std::vector<StatusUpdate>& updates,
       const process::Future<bool>& removed);
 
-  // Validates the task.
-  // Returns None if the task is valid.
-  // Returns Error if the task is invalid.
-  Option<Error> validateTask(
-      const TaskInfo& task,
-      Framework* framework,
-      Slave* slave,
-      const Resources& totalResources,
-      const Resources& usedResources);
-
   // Authorizes the task.
   // Returns true if task is authorized.
   // Returns false if task is not authorized.
@@ -510,9 +501,17 @@ private:
   Master(const Master&);              // No copying.
   Master& operator = (const Master&); // No assigning.
 
-  friend struct OfferValidator;
   friend struct Metrics;
 
+  // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to
+  // make the following functions friends so that validation functions
+  // can get Offer* and Slave*.
+  friend Offer* validation::offer::getOffer(
+      Master* master, const OfferID& offerId);
+
+  friend Slave* validation::offer::getSlave(
+      Master* master, const SlaveID& slaveId);
+
   const Flags flags;
 
   Option<MasterInfo> leader; // Current leading master.

http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
new file mode 100644
index 0000000..8804ba6
--- /dev/null
+++ b/src/master/validation.cpp
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+#include <stout/none.hpp>
+#include <stout/stringify.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "master/master.hpp"
+#include "master/validation.hpp"
+
+using std::string;
+using std::vector;
+
+using google::protobuf::RepeatedPtrField;
+
+namespace mesos {
+namespace internal {
+namespace master {
+namespace validation {
+
+// A helper function which returns true if the given character is not
+// suitable for an ID.
+static bool invalid(char c)
+{
+  return iscntrl(c) || c == '/' || c == '\\';
+}
+
+namespace resource {
+
+// Validates the DiskInfos specified in the given resources (if
+// exist). Returns error if any DiskInfo is found invalid or
+// unsupported.
+Option<Error> validateDiskInfo(const RepeatedPtrField<Resource>& resources)
+{
+  foreach (const Resource& resource, resources) {
+    if (!resource.has_disk()) {
+      continue;
+    }
+
+    if (resource.disk().has_persistence()) {
+      if (resource.role() == "*") {
+        return Error("'*' role not supported for persistent volume");
+      }
+      if (!resource.disk().has_volume()) {
+        return Error("Expecting 'volume' to be set for persistent volume");
+      }
+      if (resource.disk().volume().mode() == Volume::RO) {
+        return Error("Read-only persistent volume not supported");
+      }
+      if (resource.disk().volume().has_host_path()) {
+        return Error("Expecting 'host_path' to be unset for persistent volume");
+      }
+
+      // Ensure persistence ID does not have invalid characters.
+      string id = resource.disk().persistence().id();
+      if (std::count_if(id.begin(), id.end(), invalid) > 0) {
+        return Error("Persistence ID '" + id + "' contains invalid characters");
+      }
+    } else if (resource.disk().has_volume()) {
+      return Error("Non-persistent volume not supported");
+    } else {
+      return Error("DiskInfo is set but empty");
+    }
+  }
+
+  return None();
+}
+
+
+// Validates the uniqueness of the persistence IDs used in the given
+// resources. They need to be unique per role on each slave.
+Option<Error> validateUniquePersistenceID(
+    const RepeatedPtrField<Resource>& resources)
+{
+  hashmap<string, hashset<string>> persistenceIds;
+
+  // Check duplicated persistence ID within the given resources.
+  foreach (const Resource& resource, resources) {
+    if (!resource.has_disk() || !resource.disk().has_persistence()) {
+      continue;
+    }
+
+    const string& role = resource.role();
+    const string& id = resource.disk().persistence().id();
+
+    if (persistenceIds.contains(role) &&
+        persistenceIds[role].contains(id)) {
+      return Error("Persistence ID '" + id + "' is not unique");
+    }
+
+    persistenceIds[role].insert(id);
+  }
+
+  return None();
+}
+
+
+Option<Error> validate(const RepeatedPtrField<Resource>& resources)
+{
+  Option<Error> error = Resources::validate(resources);
+  if (error.isSome()) {
+    return Error("Invalid resources: " + error.get().message);
+  }
+
+  error = validateDiskInfo(resources);
+  if (error.isSome()) {
+    return Error("Invalid DiskInfo: " + error.get().message);
+  }
+
+  return None();
+}
+
+} // namespace resource {
+
+
+namespace task {
+
+// Validates that a task id is valid, i.e., contains only valid
+// characters.
+Option<Error> validateTaskID(const TaskInfo& task)
+{
+  const string& id = task.task_id().value();
+
+  if (std::count_if(id.begin(), id.end(), invalid) > 0) {
+    return Error("TaskID '" + id + "' contains invalid characters");
+  }
+
+  return None();
+}
+
+
+// Validates that the TaskID does not collide with any existing tasks
+// for the framework.
+Option<Error> validateUniqueTaskID(const TaskInfo& task, Framework*
+    framework)
+{
+  const TaskID& taskId = task.task_id();
+
+  if (framework->tasks.contains(taskId)) {
+    return Error("Task has duplicate ID: " + taskId.value());
+  }
+
+  return None();
+}
+
+
+// Validates that the slave ID used by a task is correct.
+Option<Error> validateSlaveID(const TaskInfo& task, Slave* slave)
+{
+  if (task.slave_id() != slave->id) {
+    return Error(
+        "Task uses invalid slave " + task.slave_id().value() +
+        " while slave " + slave->id.value() + " is expected");
+  }
+
+  return None();
+}
+
+
+// Validates that tasks that use the "same" executor (i.e., same
+// ExecutorID) have an identical ExecutorInfo.
+Option<Error> validateExecutorInfo(
+    const TaskInfo& task, Framework* framework, Slave* slave)
+{
+  if (task.has_executor() == task.has_command()) {
+    return Error(
+        "Task should have at least one (but not both) of CommandInfo or "
+        "ExecutorInfo present");
+  }
+
+  if (task.has_executor()) {
+    // The master currently expects ExecutorInfo.framework_id to be
+    // set even though it is an optional field. Currently, the
+    // scheduler driver ensures that the field is set. For schedulers
+    // not using the driver, we need to do the validation here.
+    // TODO(bmahler): Set this field in the master instead of
+    // depending on the scheduler driver do it.
+    if (!task.executor().has_framework_id()) {
+      return Error(
+          "Task has invalid ExecutorInfo: missing field 'framework_id'");
+    }
+
+    if (task.executor().framework_id() != framework->id) {
+      return Error(
+          "ExecutorInfo has an invalid FrameworkID"
+          " (Actual: " + stringify(task.executor().framework_id()) +
+          " vs Expected: " + stringify(framework->id) + ")");
+    }
+
+    const ExecutorID& executorId = task.executor().executor_id();
+    Option<ExecutorInfo> executorInfo = None();
+
+    if (slave->hasExecutor(framework->id, executorId)) {
+      executorInfo = slave->executors.get(framework->id).get().get(executorId);
+    }
+
+    if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
+      return Error(
+          "Task has invalid ExecutorInfo (existing ExecutorInfo"
+          " with same ExecutorID is not compatible).\n"
+          "------------------------------------------------------------\n"
+          "Existing ExecutorInfo:\n" +
+          stringify(executorInfo.get()) + "\n"
+          "------------------------------------------------------------\n"
+          "Task's ExecutorInfo:\n" +
+          stringify(task.executor()) + "\n"
+          "------------------------------------------------------------\n");
+    }
+  }
+
+  return None();
+}
+
+
+// Validates that a task that asks for checkpointing is not being
+// launched on a slave that has not enabled checkpointing.
+// TODO(jieyu): Remove this in favor of a CHECK, because the allocator
+// should filter these out.
+Option<Error> validateCheckpoint(Framework* framework, Slave* slave)
+{
+  if (framework->info.checkpoint() && !slave->info.checkpoint()) {
+    return Error(
+        "Task asked to be checkpointed but slave " +
+        stringify(slave->id) + " has checkpointing disabled");
+  }
+
+  return None();
+}
+
+// Validates that the resources specified by the framework are valid.
+Option<Error> validateResources(const TaskInfo& task)
+{
+  Option<Error> error = resource::validate(task.resources());
+  if (error.isSome()) {
+    return Error("Task uses invalid resources: " + error.get().message);
+  }
+
+  Resources total = task.resources();
+
+  if (task.has_executor()) {
+    error = resource::validate(task.executor().resources());
+    if (error.isSome()) {
+      return Error("Executor uses invalid resources: " + error.get().message);
+    }
+
+    total += task.executor().resources();
+  }
+
+  error = resource::validateUniquePersistenceID(total);
+  if (error.isSome()) {
+    return error;
+  }
+
+  return None();
+}
+
+
+// Validates that the task and the executor are using proper amount of
+// resources. For instance, the used resources by a task on each slave
+// should not exceed the total resources offered on that slave.
+Option<Error> validateResourceUsage(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave,
+    const Resources& offered)
+{
+  Resources taskResources = task.resources();
+
+  if (taskResources.empty()) {
+    return Error("Task uses no resources");
+  }
+
+  Resources executorResources;
+  if (task.has_executor()) {
+    executorResources = task.executor().resources();
+  }
+
+  // Validate minimal cpus and memory resources of executor and log
+  // warnings if not set.
+  if (task.has_executor()) {
+    // TODO(martin): MESOS-1807. Return Error instead of logging a
+    // warning in 0.22.0.
+    Option<double> cpus =  executorResources.cpus();
+    if (cpus.isNone() || cpus.get() < MIN_CPUS) {
+      LOG(WARNING)
+        << "Executor " << stringify(task.executor().executor_id())
+        << " for task " << stringify(task.task_id())
+        << " uses less CPUs ("
+        << (cpus.isSome() ? stringify(cpus.get()) : "None")
+        << ") than the minimum required (" << MIN_CPUS
+        << "). Please update your executor, as this will be mandatory "
+        << "in future releases.";
+    }
+
+    Option<Bytes> mem = executorResources.mem();
+    if (mem.isNone() || mem.get() < MIN_MEM) {
+      LOG(WARNING)
+        << "Executor " << stringify(task.executor().executor_id())
+        << " for task " << stringify(task.task_id())
+        << " uses less memory ("
+        << (mem.isSome() ? stringify(mem.get().megabytes()) : "None")
+        << ") than the minimum required (" << MIN_MEM
+        << "). Please update your executor, as this will be mandatory "
+        << "in future releases.";
+    }
+  }
+
+  // Validate if resources needed by the task (and its executor in
+  // case the executor is new) are available.
+  Resources total = taskResources;
+  if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
+    total += executorResources;
+  }
+
+  if (!offered.contains(total)) {
+    return Error(
+        "Task uses more resources " + stringify(total) +
+        " than available " + stringify(offered));
+  }
+
+  return None();
+}
+
+
+Option<Error> validate(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave,
+    const Resources& offered)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+
+  // NOTE: The order in which the following validate functions are
+  // executed does matter! For example, 'validateResourceUsage'
+  // assumes that ExecutorInfo is valid which is verified by
+  // 'validateExecutorInfo'.
+  vector<lambda::function<Option<Error>(void)>> validators = {
+    lambda::bind(validateTaskID, task),
+    lambda::bind(validateUniqueTaskID, task, framework),
+    lambda::bind(validateSlaveID, task, slave),
+    lambda::bind(validateExecutorInfo, task, framework, slave),
+    lambda::bind(validateCheckpoint, framework, slave),
+    lambda::bind(validateResources, task),
+    lambda::bind(validateResourceUsage, task, framework, slave, offered)
+  };
+
+  // TODO(benh): Add a validateHealthCheck function.
+
+  // TODO(jieyu): Add a validateCommandInfo function.
+
+  foreach (const lambda::function<Option<Error>(void)>& validator, validators) {
+    Option<Error> error = validator();
+    if (error.isSome()) {
+      return error;
+    }
+  }
+
+  return None();
+}
+
+} // namespace task {
+
+
+namespace offer {
+
+Offer* getOffer(Master* master, const OfferID& offerId)
+{
+  CHECK_NOTNULL(master);
+  return master->getOffer(offerId);
+}
+
+
+Slave* getSlave(Master* master, const SlaveID& slaveId)
+{
+  CHECK_NOTNULL(master);
+  return master->getSlave(slaveId);
+}
+
+
+// Validates that an offer only appears once in offer list.
+Option<Error> validateUniqueOfferID(const RepeatedPtrField<OfferID>& offerIds)
+{
+  hashset<OfferID> offers;
+
+  foreach (const OfferID& offerId, offerIds) {
+    if (offers.contains(offerId)) {
+      return Error("Duplicate offer " + stringify(offerId) + " in offer list");
+    }
+
+    offers.insert(offerId);
+  }
+
+  return None();
+}
+
+
+// Validates that all offers belongs to the expected framework.
+Option<Error> validateFramework(
+    const RepeatedPtrField<OfferID>& offerIds,
+    Master* master,
+    Framework* framework)
+{
+  foreach (const OfferID& offerId, offerIds) {
+    Offer* offer = getOffer(master, offerId);
+    if (offer == NULL) {
+      return Error("Offer " + stringify(offerId) + " is no longer valid");
+    }
+
+    if (framework->id != offer->framework_id()) {
+      return Error(
+          "Offer " + stringify(offer->id()) +
+          " has invalid framework " + stringify(offer->framework_id()) +
+          " while framework " + stringify(framework->id) + " is expected");
+    }
+  }
+
+  return None();
+}
+
+
+// Validates that all offers belong to the same valid slave.
+Option<Error> validateSlave(
+    const RepeatedPtrField<OfferID>& offerIds, Master* master)
+{
+  Option<SlaveID> slaveId;
+
+  foreach (const OfferID& offerId, offerIds) {
+    Offer* offer = getOffer(master, offerId);
+    if (offer == NULL) {
+      return Error("Offer " + stringify(offerId) + " is no longer valid");
+    }
+
+    Slave* slave = getSlave(master, offer->slave_id());
+
+    // This is not possible because the offer should've been removed.
+    CHECK(slave != NULL)
+      << "Offer " << offerId
+      << " outlived slave " << offer->slave_id();
+
+    // This is not possible because the offer should've been removed.
+    CHECK(slave->connected)
+      << "Offer " << offerId
+      << " outlived disconnected slave " << *slave;
+
+    if (slaveId.isNone()) {
+      // Set slave id and use as base case for validation.
+      slaveId = slave->id;
+    }
+
+    if (slave->id != slaveId.get()) {
+      return Error(
+          "Aggregated offers must belong to one single slave. Offer " +
+          stringify(offerId) + " uses slave " +
+          stringify(slave->id) + " and slave " +
+          stringify(slaveId.get()));
+    }
+  }
+
+  return None();
+}
+
+
+Option<Error> validate(
+    const RepeatedPtrField<OfferID>& offerIds,
+    Master* master,
+    Framework* framework)
+{
+  CHECK_NOTNULL(master);
+  CHECK_NOTNULL(framework);
+
+  vector<lambda::function<Option<Error>(void)>> validators = {
+    lambda::bind(validateUniqueOfferID, offerIds),
+    lambda::bind(validateFramework, offerIds, master, framework),
+    lambda::bind(validateSlave, offerIds, master)
+  };
+
+  foreach (const lambda::function<Option<Error>(void)>& validator, validators) {
+    Option<Error> error = validator();
+    if (error.isSome()) {
+      return error;
+    }
+  }
+
+  return None();
+}
+
+} // namespace offer {
+
+
+namespace operation {
+
+// TODO(jieyu): Added validate functions for Offer operations.
+
+} // namespace operation {
+
+} // namespace validation {
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b22d7add/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
new file mode 100644
index 0000000..642c375
--- /dev/null
+++ b/src/master/validation.hpp
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <google/protobuf/repeated_field.h>
+
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <stout/error.hpp>
+#include <stout/option.hpp>
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+class Master;
+
+struct Framework;
+struct Slave;
+
+namespace validation {
+
+namespace resource {
+
+// Validates resources specified by frameworks.
+// NOTE: We cannot take 'Resources' here because invalid resources are
+// silently ignored within its constructor.
+Option<Error> validate(
+    const google::protobuf::RepeatedPtrField<Resource>& resources);
+
+} // namespace resource {
+
+
+namespace task {
+
+// Validates a task that a framework attempts to launch within the
+// offered resources. Returns an optional error which will cause the
+// master to send a failed status update back to the framework.
+// NOTE: This function must be called sequentially for each task, and
+// each task needs to be launched before the next can be validated.
+Option<Error> validate(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave,
+    const Resources& offered);
+
+} // namespace task {
+
+
+namespace offer {
+
+// NOTE: These two functions are placed in the header file because we
+// need to declare them as friends of Master.
+Offer* getOffer(Master* master, const OfferID& offerId);
+Slave* getSlave(Master* master, const SlaveID& slaveId);
+
+
+// Validates the given offers.
+Option<Error> validate(
+    const google::protobuf::RepeatedPtrField<OfferID>& offerIds,
+    Master* master,
+    Framework* framework);
+
+} // namespace offer {
+
+
+namespace operation {
+
+// TODO(jieyu): Added validate functions for Offer operations.
+
+} // namespace operation {
+
+} // namespace validation {
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {