You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/08/23 03:06:27 UTC

[1/3] mesos git commit: Added master validations for TaskGroup.

Repository: mesos
Updated Branches:
  refs/heads/master 4000fd31b -> 06c0e3e6e


Added master validations for TaskGroup.

The TaskGroup is considered invalid if any of the tasks or the executor
is invalid.

Review: https://reviews.apache.org/r/51270


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/06c0e3e6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/06c0e3e6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/06c0e3e6

Branch: refs/heads/master
Commit: 06c0e3e6ecd7fa526ba69886340d41882dbdf0a9
Parents: fb58d38
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Aug 15 16:10:04 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Aug 22 20:04:04 2016 -0700

----------------------------------------------------------------------
 src/master/validation.cpp             | 168 +++++++++++++++++++++++++++++
 src/master/validation.hpp             |  35 +++++-
 src/tests/master_validation_tests.cpp | 155 ++++++++++++++++++++++++++
 3 files changed, 357 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/06c0e3e6/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 803aa48..30319e7 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -972,6 +972,174 @@ Option<Error> validate(
   return None();
 }
 
+namespace group {
+
+namespace internal {
+
+Option<Error> validateTask(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+
+  // Do the general validation first.
+  Option<Error> error = task::internal::validateTask(task, framework, slave);
+  if (error.isSome()) {
+    return error;
+  }
+
+  // Now do `TaskGroup` specific validation.
+  if (task.has_executor()) {
+    return Error("'TaskInfo.executor' must not be set");
+  }
+
+  return None();
+}
+
+
+Option<Error> validateTaskGroupAndExecutorResources(
+    const TaskGroupInfo& taskGroup,
+    const ExecutorInfo& executor)
+{
+  Resources total = executor.resources();
+  foreach (const TaskInfo& task, taskGroup.tasks()) {
+    total += task.resources();
+  }
+
+  Option<Error> error = resource::validateUniquePersistenceID(total);
+  if (error.isSome()) {
+    return Error("Task group and executor use duplicate persistence ID: " +
+                 error->message);
+  }
+
+  error = resource::validateRevocableAndNonRevocableResources(total);
+  if (error.isSome()) {
+    return Error("Task group and executor mix revocable and non-revocable"
+                 " resources: " + error->message);
+  }
+
+  return None();
+}
+
+
+Option<Error> validateExecutor(
+    const TaskGroupInfo& taskGroup,
+    const ExecutorInfo& executor,
+    Framework* framework,
+    Slave* slave,
+    const Resources& offered)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+
+  // Do the general validation first.
+  Option<Error> error =
+    executor::internal::validate(executor, framework, slave);
+
+  if (error.isSome()) {
+    return error;
+  }
+
+  // Now do `TaskGroup` specific validation.
+
+  if (!executor.has_type()) {
+    return Error("'ExecutorInfo.type' must be set");
+  }
+
+
+  if (executor.type() == ExecutorInfo::UNKNOWN) {
+    return Error("Unknown executor type");
+  }
+
+  const Resources& executorResources = executor.resources();
+
+  // Validate minimal cpus and memory resources of executor.
+  Option<double> cpus =  executorResources.cpus();
+  if (cpus.isNone() || cpus.get() < MIN_CPUS) {
+    return Error(
+      "Executor '" + stringify(executor.executor_id()) +
+      "' uses less CPUs (" +
+      (cpus.isSome() ? stringify(cpus.get()) : "None") +
+      ") than the minimum required (" + stringify(MIN_CPUS) + ")");
+  }
+
+  Option<Bytes> mem = executorResources.mem();
+  if (mem.isNone() || mem.get() < MIN_MEM) {
+    return Error(
+      "Executor '" + stringify(executor.executor_id()) +
+      "' uses less memory (" +
+      (mem.isSome() ? stringify(mem.get().megabytes()) : "None") +
+      ") than the minimum required (" + stringify(MIN_MEM) + ")");
+  }
+
+  Option<Bytes> disk = executorResources.disk();
+  if (disk.isNone()) {
+    return Error(
+      "Executor '" + stringify(executor.executor_id()) + "' uses no disk");
+  }
+
+  // Validate combined resources of task group and executor.
+
+  // NOTE: This is refactored into a separate function so that it can
+  // be easily unit tested.
+  error = internal::validateTaskGroupAndExecutorResources(taskGroup, executor);
+  if (error.isSome()) {
+    return error;
+  }
+
+  Resources total;
+  foreach (const TaskInfo& task, taskGroup.tasks()) {
+    total += task.resources();
+  }
+
+  if (!slave->hasExecutor(framework->id(), executor.executor_id())) {
+    total += executorResources;
+  }
+
+  if (!offered.contains(total)) {
+    return Error(
+        "Total resources " + stringify(total) + " required by task group and"
+        " its executor are more than available " + stringify(offered));
+  }
+
+  return None();
+}
+
+} // namespace internal {
+
+
+Option<Error> validate(
+    const TaskGroupInfo& taskGroup,
+    const ExecutorInfo& executor,
+    Framework* framework,
+    Slave* slave,
+    const Resources& offered)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+
+  foreach (const TaskInfo& task, taskGroup.tasks()) {
+    Option<Error> error = internal::validateTask(task, framework, slave);
+    if (error.isSome()) {
+      return Error("Task '" + stringify(task.task_id()) + "' is invalid: " +
+                   error->message);
+    }
+  }
+
+  Option<Error> error =
+    internal::validateExecutor(taskGroup, executor, framework, slave, offered);
+
+  if (error.isSome()) {
+    return error;
+  }
+
+  return None();
+}
+
+} // namespace group {
+
 } // namespace task {
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/06c0e3e6/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index f1df645..3205ee1 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -91,7 +91,8 @@ namespace task {
 
 // Validates a task that a framework attempts to launch within the
 // offered resources. Returns an optional error which will cause the
-// master to send a failed status update back to the framework.
+// master to send a `TASK_ERROR` status update back to the framework.
+//
 // NOTE: This function must be called sequentially for each task, and
 // each task needs to be launched before the next can be validated.
 Option<Error> validate(
@@ -118,6 +119,38 @@ Option<Error> validateHealthCheck(const TaskInfo& task);
 
 } // namespace internal {
 
+namespace group {
+
+// Validates a task group that a framework attempts to launch within the
+// offered resources. Returns an optional error which will cause the
+// master to send a `TASK_ERROR` status updates for *all* the tasks in
+// the task group back to the framework.
+//
+// NOTE: Validation error of *any* task will cause all the tasks in the task
+// group to be rejected by the master.
+Option<Error> validate(
+    const TaskGroupInfo& taskGroup,
+    const ExecutorInfo& executor,
+    Framework* framework,
+    Slave* slave,
+    const Resources& offered);
+
+
+// Functions in this namespace are only exposed for testing.
+namespace internal {
+
+// Validates that the resources specified by
+// the task group and its executor are valid.
+//
+// TODO(vinod): Consolidate this with `validateTaskAndExecutorResources()`.
+Option<Error> validateTaskGroupAndExecutorResources(
+    const TaskGroupInfo& taskGroup,
+    const ExecutorInfo& executor);
+
+} // namespace internal {
+
+} // namespace group {
+
 } // namespace task {
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/06c0e3e6/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index 4800df1..e1a5030 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1450,6 +1450,161 @@ TEST_F(ExecutorValidationTest, ExecutorType)
   }
 }
 
+
+class TaskGroupValidationTest : public MesosTest {};
+
+
+// This test verifies that tasks in a task group cannot mix
+// revocable and non-revocable resources.
+TEST_F(TaskGroupValidationTest, TaskGroupUsesRevocableResources)
+{
+  TaskInfo task1;
+  task1.set_name("test1");
+  task1.mutable_task_id()->set_value("task1");
+  task1.mutable_slave_id()->set_value("slave");
+
+  TaskInfo task2;
+  task2.set_name("test2");
+  task2.mutable_task_id()->set_value("task2");
+  task2.mutable_slave_id()->set_value("slave");
+
+  ExecutorInfo executor = DEFAULT_EXECUTOR_INFO;
+
+  // Non-revocable cpus.
+  Resource cpus;
+  cpus.set_name("cpus");
+  cpus.set_type(Value::SCALAR);
+  cpus.mutable_scalar()->set_value(2);
+
+  // A task group with only non-revocable cpus is valid.
+  task1.add_resources()->CopyFrom(cpus);
+  task2.add_resources()->CopyFrom(cpus);
+
+  TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(task1);
+  taskGroup.add_tasks()->CopyFrom(task2);
+
+  EXPECT_NONE(task::group::internal::validateTaskGroupAndExecutorResources(
+      taskGroup, executor));
+
+  // Revocable cpus.
+  Resource revocableCpus = cpus;
+  revocableCpus.mutable_revocable();
+
+  // A task group with only revocable cpus is valid.
+  task1.clear_resources();
+  task2.clear_resources();
+  task1.add_resources()->CopyFrom(revocableCpus);
+  task2.add_resources()->CopyFrom(revocableCpus);
+
+  taskGroup.clear_tasks();
+  taskGroup.add_tasks()->CopyFrom(task1);
+  taskGroup.add_tasks()->CopyFrom(task2);
+
+  EXPECT_NONE(task::group::internal::validateTaskGroupAndExecutorResources(
+      taskGroup, executor));
+
+  // A task group with one task using revocable resources and another task
+  // using non-revocable cpus is invalid.
+  task1.clear_resources();
+  task2.clear_resources();
+  task1.add_resources()->CopyFrom(cpus);
+  task2.add_resources()->CopyFrom(revocableCpus);
+
+  taskGroup.clear_tasks();
+  taskGroup.add_tasks()->CopyFrom(task1);
+  taskGroup.add_tasks()->CopyFrom(task2);
+
+  EXPECT_SOME(task::group::internal::validateTaskGroupAndExecutorResources(
+      taskGroup, executor));
+}
+
+
+// This test verifies that tasks in a task group and executor
+// cannot mix revocable and non-revocable resources.
+TEST_F(TaskGroupValidationTest, TaskGroupAndExecutorUsesRevocableResources)
+{
+  TaskInfo task;
+  task.set_name("test1");
+  task.mutable_task_id()->set_value("task1");
+  task.mutable_slave_id()->set_value("slave");
+
+  ExecutorInfo executor = DEFAULT_EXECUTOR_INFO;
+
+  // Non-revocable cpus.
+  Resource cpus;
+  cpus.set_name("cpus");
+  cpus.set_type(Value::SCALAR);
+  cpus.mutable_scalar()->set_value(2);
+
+  // A task group and executor with only non-revocable cpus is valid.
+  task.add_resources()->CopyFrom(cpus);
+
+  TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(task);
+
+  executor.add_resources()->CopyFrom(cpus);
+
+  EXPECT_NONE(task::group::internal::validateTaskGroupAndExecutorResources(
+      taskGroup, executor));
+
+  // Revocable cpus.
+  Resource revocableCpus = cpus;
+  revocableCpus.mutable_revocable();
+
+  // A task group and executor with only revocable cpus is valid.
+  task.clear_resources();
+  task.add_resources()->CopyFrom(revocableCpus);
+
+  taskGroup.clear_tasks();
+  taskGroup.add_tasks()->CopyFrom(task);
+
+  executor.clear_resources();
+  executor.add_resources()->CopyFrom(revocableCpus);
+
+  EXPECT_NONE(task::group::internal::validateTaskGroupAndExecutorResources(
+      taskGroup, executor));
+
+  // A task group with the task using revocable resources and executor
+  // using non-revocable cpus is invalid.
+  task.clear_resources();
+  task.add_resources()->CopyFrom(revocableCpus);
+
+  taskGroup.clear_tasks();
+  taskGroup.add_tasks()->CopyFrom(task);
+
+  executor.clear_resources();
+  executor.add_resources()->CopyFrom(cpus);
+
+  Option<Error> error =
+    task::group::internal::validateTaskGroupAndExecutorResources(
+        taskGroup, executor);
+
+  EXPECT_SOME(error);
+  EXPECT_TRUE(strings::contains(
+      error->message,
+      "Task group and executor mix revocable and non-revocable resources"));
+
+  // A task group with the task using non-revocable resources and executor
+  // using revocable cpus is invalid.
+  task.clear_resources();
+  task.add_resources()->CopyFrom(cpus);
+
+  taskGroup.clear_tasks();
+  taskGroup.add_tasks()->CopyFrom(task);
+
+  executor.clear_resources();
+  executor.add_resources()->CopyFrom(revocableCpus);
+
+  error = task::group::internal::validateTaskGroupAndExecutorResources(
+      taskGroup, executor);
+
+  EXPECT_SOME(error);
+  EXPECT_TRUE(strings::contains(
+      error->message,
+      "Task group and executor mix revocable and non-revocable resources"));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[3/3] mesos git commit: Fixed indentation in master/validation.cpp.

Posted by vi...@apache.org.
Fixed indentation in master/validation.cpp.

Review: https://reviews.apache.org/r/51247


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47f2c218
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47f2c218
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47f2c218

Branch: refs/heads/master
Commit: 47f2c2187bffe79d9d05085f403b9087154d6618
Parents: 4000fd3
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Aug 15 16:12:26 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Aug 22 20:04:04 2016 -0700

----------------------------------------------------------------------
 src/master/validation.cpp | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/47f2c218/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index ddc7ac3..19a63ea 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -534,8 +534,7 @@ Option<Error> validateTaskID(const TaskInfo& task)
 
 // Validates that the TaskID does not collide with any existing tasks
 // for the framework.
-Option<Error> validateUniqueTaskID(const TaskInfo& task, Framework*
-    framework)
+Option<Error> validateUniqueTaskID(const TaskInfo& task, Framework* framework)
 {
   const TaskID& taskId = task.task_id();
 
@@ -563,7 +562,9 @@ Option<Error> validateSlaveID(const TaskInfo& task, Slave* slave)
 // Validates that tasks that use the "same" executor (i.e., same
 // ExecutorID) have an identical ExecutorInfo.
 Option<Error> validateExecutorInfo(
-    const TaskInfo& task, Framework* framework, Slave* slave)
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave)
 {
   if (task.has_executor() == task.has_command()) {
     return Error(
@@ -940,7 +941,8 @@ Option<Error> validateFramework(
 
 // Validates that all offers belong to the same valid slave.
 Option<Error> validateSlave(
-    const RepeatedPtrField<OfferID>& offerIds, Master* master)
+    const RepeatedPtrField<OfferID>& offerIds,
+    Master* master)
 {
   Option<SlaveID> slaveId;
 


[2/3] mesos git commit: Refactored task validations in master.

Posted by vi...@apache.org.
Refactored task validations in master.

Refactored in such a way that most of the helper functions can be
reused for doing task group validation.

Note that there are no functional changes here only code movement.

Review: https://reviews.apache.org/r/51248


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fb58d38d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fb58d38d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fb58d38d

Branch: refs/heads/master
Commit: fb58d38d86b63804448455bcf2e356d75554e5e1
Parents: 47f2c21
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Aug 16 19:22:34 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Aug 22 20:04:04 2016 -0700

----------------------------------------------------------------------
 src/master/validation.cpp             | 475 +++++++++++++++++++----------
 src/master/validation.hpp             |  17 +-
 src/tests/master_validation_tests.cpp | 109 ++++++-
 3 files changed, 438 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fb58d38d/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 19a63ea..803aa48 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -470,6 +470,25 @@ Option<Error> validateUniquePersistenceID(
 }
 
 
+// Validates that revocable and non-revocable
+// resources of the same name do not exist.
+//
+// TODO(vinod): Is this the right place to do this?
+Option<Error> validateRevocableAndNonRevocableResources(
+    const Resources& _resources)
+{
+  foreach (const string& name, _resources.names()) {
+    Resources resources = _resources.get(name);
+    if (!resources.revocable().empty() && resources != resources.revocable()) {
+      return Error("Cannot use both revocable and non-revocable '" + name +
+                   "' at the same time");
+    }
+  }
+
+  return None();
+}
+
+
 // Validates that all the given resources are persistent volumes.
 Option<Error> validatePersistentVolume(
     const RepeatedPtrField<Resource>& volumes)
@@ -514,6 +533,161 @@ Option<Error> validate(const RepeatedPtrField<Resource>& resources)
 } // namespace resource {
 
 
+namespace executor {
+namespace internal {
+
+Option<Error> validateType(const ExecutorInfo& executor)
+{
+  switch (executor.type()) {
+    case ExecutorInfo::DEFAULT:
+      if (executor.has_command()) {
+        return Error(
+            "'ExecutorInfo.command' must not be set for 'DEFAULT' executor");
+      }
+      break;
+
+    case ExecutorInfo::CUSTOM:
+      if (!executor.has_command()) {
+        return Error(
+            "'ExecutorInfo.command' must be set for 'CUSTOM' executor");
+      }
+      break;
+
+    case ExecutorInfo::UNKNOWN:
+      // This could happen if a new executor type is introduced in the
+      // protos but the  master doesn't know about it yet (e.g., new
+      // scheduler launches new type of executor on an old master).
+      return None();
+  }
+
+  return None();
+}
+
+
+Option<Error> validateCompatibleExecutorInfo(
+    const ExecutorInfo& executor,
+    Framework* framework,
+    Slave* slave)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+
+  const ExecutorID& executorId = executor.executor_id();
+  Option<ExecutorInfo> executorInfo = None();
+
+  if (slave->hasExecutor(framework->id(), executorId)) {
+    executorInfo =
+      slave->executors.at(framework->id()).at(executorId);
+  }
+
+  if (executorInfo.isSome() && !(executor == executorInfo.get())) {
+    return Error(
+        "ExecutorInfo is not compatible with existing ExecutorInfo"
+        " with same ExecutorID).\n"
+        "------------------------------------------------------------\n"
+        "Existing ExecutorInfo:\n" +
+        stringify(executorInfo.get()) + "\n"
+        "------------------------------------------------------------\n"
+        "ExecutorInfo:\n" +
+        stringify(executor) + "\n"
+        "------------------------------------------------------------\n");
+  }
+
+  return None();
+}
+
+
+Option<Error> validateFrameworkID(
+    const ExecutorInfo& executor,
+    Framework* framework)
+{
+  CHECK_NOTNULL(framework);
+
+  // Master ensures `ExecutorInfo.framework_id`
+  // is set before calling this method.
+  CHECK(executor.has_framework_id());
+
+  if (executor.framework_id() != framework->id()) {
+    return Error(
+        "ExecutorInfo has an invalid FrameworkID"
+        " (Actual: " + stringify(executor.framework_id()) +
+        " vs Expected: " + stringify(framework->id()) + ")");
+  }
+
+  return None();
+}
+
+
+Option<Error> validateShutdownGracePeriod(const ExecutorInfo& executor)
+{
+  // Make sure provided duration is non-negative.
+  if (executor.has_shutdown_grace_period() &&
+      Nanoseconds(executor.shutdown_grace_period().nanoseconds()) <
+        Duration::zero()) {
+    return Error(
+        "ExecutorInfo's 'shutdown_grace_period' must be non-negative");
+  }
+
+  return None();
+}
+
+
+Option<Error> validateResources(const ExecutorInfo& executor)
+{
+  Option<Error> error = resource::validate(executor.resources());
+  if (error.isSome()) {
+    return Error("Executor uses invalid resources: " + error->message);
+  }
+
+  const Resources& resources = executor.resources();
+
+  error = resource::validateUniquePersistenceID(resources);
+  if (error.isSome()) {
+    return Error(
+        "Executor uses duplicate persistence ID: " + error->message);
+  }
+
+  error = resource::validateRevocableAndNonRevocableResources(resources);
+  if (error.isSome()) {
+    return Error("Executor mixes revocable and non-revocable resources: " +
+                 error->message);
+  }
+
+  return None();
+}
+
+
+Option<Error> validate(
+    const ExecutorInfo& executor,
+    Framework* framework,
+    Slave* slave)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+
+  vector<lambda::function<Option<Error>()>> validators = {
+    lambda::bind(internal::validateType, executor),
+    lambda::bind(internal::validateFrameworkID, executor, framework),
+    lambda::bind(internal::validateShutdownGracePeriod, executor),
+    lambda::bind(internal::validateResources, executor),
+    lambda::bind(
+        internal::validateCompatibleExecutorInfo, executor, framework, slave)
+  };
+
+  foreach (const lambda::function<Option<Error>()>& validator, validators) {
+    Option<Error> error = validator();
+    if (error.isSome()) {
+      return error;
+    }
+  }
+
+  return None();
+}
+
+} // namespace internal {
+} // namespace executor {
+
+
 namespace task {
 
 namespace internal {
@@ -559,73 +733,115 @@ Option<Error> validateSlaveID(const TaskInfo& task, Slave* slave)
 }
 
 
-// Validates that tasks that use the "same" executor (i.e., same
-// ExecutorID) have an identical ExecutorInfo.
-Option<Error> validateExecutorInfo(
-    const TaskInfo& task,
-    Framework* framework,
-    Slave* slave)
+Option<Error> validateKillPolicy(const TaskInfo& task)
 {
-  if (task.has_executor() == task.has_command()) {
-    return Error(
-        "Task should have at least one (but not both) of CommandInfo or "
-        "ExecutorInfo present");
+  if (task.has_kill_policy() &&
+      task.kill_policy().has_grace_period() &&
+      Nanoseconds(task.kill_policy().grace_period().nanoseconds()) <
+        Duration::zero()) {
+    return Error("Task's 'kill_policy.grace_period' must be non-negative");
   }
 
-  if (task.has_executor()) {
-    // Master ensures `ExecutorInfo.framework_id` is set before calling
-    // this method.
-    CHECK(task.executor().has_framework_id());
+  return None();
+}
 
-    if (task.executor().framework_id() != framework->id()) {
-      return Error(
-          "ExecutorInfo has an invalid FrameworkID"
-          " (Actual: " + stringify(task.executor().framework_id()) +
-          " vs Expected: " + stringify(framework->id()) + ")");
+
+Option<Error> validateHealthCheck(const TaskInfo& task)
+{
+  if (task.has_health_check()) {
+    Option<Error> error = health::validation::healthCheck(task.health_check());
+    if (error.isSome()) {
+      return Error("Task uses invalid health check: " + error->message);
     }
+  }
 
+  return None();
+}
 
-    // TODO(vinod): Revisit these when `TaskGroup` validation is added
-    // (MESOS-6042).
 
-    if (task.executor().has_type() &&
-        task.executor().type() != ExecutorInfo::CUSTOM) {
-      return Error("'ExecutorInfo.type' must be 'CUSTOM'");
-    }
+Option<Error> validateResources(const TaskInfo& task)
+{
+  if (task.resources().empty()) {
+    return Error("Task uses no resources");
+  }
 
-    // While `ExecutorInfo.command` is optional in the protobuf,
-    // semantically it is still required for backwards compatibility.
-    if (!task.executor().has_command()) {
-      return Error("'ExecutorInfo.command' must be set");
-    }
+  Option<Error> error = resource::validate(task.resources());
+  if (error.isSome()) {
+    return Error("Task uses invalid resources: " + error->message);
+  }
 
-    const ExecutorID& executorId = task.executor().executor_id();
-    Option<ExecutorInfo> executorInfo = None();
+  const Resources& resources = task.resources();
 
-    if (slave->hasExecutor(framework->id(), executorId)) {
-      executorInfo =
-        slave->executors.get(framework->id()).get().get(executorId);
-    }
+  error = resource::validateUniquePersistenceID(resources);
+  if (error.isSome()) {
+    return Error("Task uses duplicate persistence ID: " + error->message);
+  }
 
-    if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
-      return Error(
-          "Task has invalid ExecutorInfo (existing ExecutorInfo"
-          " with same ExecutorID is not compatible).\n"
-          "------------------------------------------------------------\n"
-          "Existing ExecutorInfo:\n" +
-          stringify(executorInfo.get()) + "\n"
-          "------------------------------------------------------------\n"
-          "Task's ExecutorInfo:\n" +
-          stringify(task.executor()) + "\n"
-          "------------------------------------------------------------\n");
-    }
+  error = resource::validateRevocableAndNonRevocableResources(resources);
+  if (error.isSome()) {
+    return Error("Task mixes revocable and non-revocable resources: " +
+                 error->message);
+  }
 
-    // Make sure provided duration is non-negative.
-    if (task.executor().has_shutdown_grace_period() &&
-        Nanoseconds(task.executor().shutdown_grace_period().nanoseconds()) <
-          Duration::zero()) {
-      return Error(
-          "ExecutorInfo's 'shutdown_grace_period' must be non-negative");
+  return None();
+}
+
+
+Option<Error> validateTaskAndExecutorResources(const TaskInfo& task)
+{
+  Resources total = task.resources();
+  if (task.has_executor()) {
+    total += task.executor().resources();
+  }
+
+  Option<Error> error = resource::validate(total);
+  if (error.isSome()) {
+    return Error(
+        "Task and its executor use invalid resources: " + error->message);
+  }
+
+  error = resource::validateUniquePersistenceID(total);
+  if (error.isSome()) {
+    return Error("Task and its executor use duplicate persistence ID: " +
+                 error->message);
+  }
+
+  error = resource::validateRevocableAndNonRevocableResources(total);
+  if (error.isSome()) {
+    return Error("Task and its executor mix revocable and non-revocable"
+                 " resources: " + error->message);
+  }
+
+  return None();
+}
+
+
+// Validates task specific fields except its executor (if it exists).
+Option<Error> validateTask(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+
+  // NOTE: The order in which the following validate functions are
+  // executed does matter!
+  vector<lambda::function<Option<Error>()>> validators = {
+    lambda::bind(internal::validateTaskID, task),
+    lambda::bind(internal::validateUniqueTaskID, task, framework),
+    lambda::bind(internal::validateSlaveID, task, slave),
+    lambda::bind(internal::validateKillPolicy, task),
+    lambda::bind(internal::validateHealthCheck, task),
+    lambda::bind(internal::validateResources, task)
+  };
+
+  // TODO(jieyu): Add a validateCommandInfo function.
+
+  foreach (const lambda::function<Option<Error>()>& validator, validators) {
+    Option<Error> error = validator();
+    if (error.isSome()) {
+      return error;
     }
   }
 
@@ -633,31 +849,54 @@ Option<Error> validateExecutorInfo(
 }
 
 
-// Validates that the task and the executor are using proper amount of
-// resources. For instance, the used resources by a task on a slave
-// should not exceed the total resources offered on that slave.
-Option<Error> validateResourceUsage(
+// Validates `Task.executor` if it exists.
+Option<Error> validateExecutor(
     const TaskInfo& task,
     Framework* framework,
     Slave* slave,
     const Resources& offered)
 {
-  Resources taskResources = task.resources();
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
 
-  if (taskResources.empty()) {
-    return Error("Task uses no resources");
+  if (task.has_executor() == task.has_command()) {
+    return Error(
+        "Task should have at least one (but not both) of CommandInfo or "
+        "ExecutorInfo present");
   }
 
-  Resources executorResources;
-  if (task.has_executor()) {
-    executorResources = task.executor().resources();
-  }
+  Resources total = task.resources();
+
+  Option<Error> error = None();
 
-  // Validate minimal cpus and memory resources of executor and log
-  // warnings if not set.
   if (task.has_executor()) {
+    const ExecutorInfo& executor = task.executor();
+
+    // Do the general validation first.
+    error = executor::internal::validate(executor, framework, slave);
+    if (error.isSome()) {
+      return error;
+    }
+
+    // Now do specific validation when an executor is specified on `Task`.
+
+    // TODO(vinod): Revisit this when we allow schedulers to explicitly
+    // specify 'DEFAULT' executor in the `LAUNCH` operation.
+
+    if (executor.has_type() && executor.type() != ExecutorInfo::CUSTOM) {
+      return Error("'ExecutorInfo.type' must be 'CUSTOM'");
+    }
+
+    // While `ExecutorInfo.command` is optional in the protobuf,
+    // semantically it is still required for backwards compatibility.
+    if (!executor.has_command()) {
+      return Error("'ExecutorInfo.command' must be set");
+    }
+
     // TODO(martin): MESOS-1807. Return Error instead of logging a
-    // warning in 0.22.0.
+    // warning.
+    const Resources& executorResources = executor.resources();
+
     Option<double> cpus =  executorResources.cpus();
     if (cpus.isNone() || cpus.get() < MIN_CPUS) {
       LOG(WARNING)
@@ -681,85 +920,25 @@ Option<Error> validateResourceUsage(
         << "). Please update your executor, as this will be mandatory "
         << "in future releases.";
     }
-  }
-
-  // Validate if resources needed by the task (and its executor in
-  // case the executor is new) are available.
-  Resources total = taskResources;
-  if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) {
-    total += executorResources;
-  }
-
-  if (!offered.contains(total)) {
-    return Error(
-        "Task uses more resources " + stringify(total) +
-        " than available " + stringify(offered));
-  }
-
-  return None();
-}
-
-
-// Validates that the resources specified by the task are valid.
-Option<Error> validateResources(const TaskInfo& task)
-{
-  Option<Error> error = resource::validate(task.resources());
-  if (error.isSome()) {
-    return Error("Task uses invalid resources: " + error.get().message);
-  }
-
-  Resources total = task.resources();
 
-  if (task.has_executor()) {
-    error = resource::validate(task.executor().resources());
-    if (error.isSome()) {
-      return Error("Executor uses invalid resources: " + error.get().message);
+    if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) {
+      total += executorResources;
     }
-
-    total += task.executor().resources();
   }
 
-  // A task and its executor can either use non-revocable resources
-  // or revocable resources of a given name but not both.
-  foreach (const string& name, total.names()) {
-    Resources resources = total.get(name);
-    if (!resources.revocable().empty() &&
-        resources != resources.revocable()) {
-      return Error("Task (and its executor, if exists) uses both revocable and"
-                   " non-revocable " + name);
-    }
-  }
+  // Now validate combined resources of task and executor.
 
-  error = resource::validateUniquePersistenceID(total);
+  // NOTE: This is refactored into a separate function
+  // so that it can be easily unit tested.
+  error = task::internal::validateTaskAndExecutorResources(task);
   if (error.isSome()) {
     return error;
   }
 
-  return None();
-}
-
-
-Option<Error> validateKillPolicy(const TaskInfo& task)
-{
-  if (task.has_kill_policy() &&
-      task.kill_policy().has_grace_period() &&
-      Nanoseconds(task.kill_policy().grace_period().nanoseconds()) <
-        Duration::zero()) {
-    return Error("Task's 'kill_policy.grace_period' must be non-negative");
-  }
-
-  return None();
-}
-
-
-Option<Error> validateHealthCheck(const TaskInfo& task)
-{
-  if (task.has_health_check()) {
-    Option<Error> error = health::validation::healthCheck(task.health_check());
-
-    if (error.isSome()) {
-      return Error("Task uses invalid health check: " + error.get().message);
-    }
+  if (!offered.contains(total)) {
+    return Error(
+        "Total resources " + stringify(total) + " required by task and its"
+        " executor is more than available " + stringify(offered));
   }
 
   return None();
@@ -768,6 +947,7 @@ Option<Error> validateHealthCheck(const TaskInfo& task)
 } // namespace internal {
 
 
+// Validate task and its executor (if it exists).
 Option<Error> validate(
     const TaskInfo& task,
     Framework* framework,
@@ -777,26 +957,11 @@ Option<Error> validate(
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(slave);
 
-  // NOTE: The order in which the following validate functions are
-  // executed does matter! For example, 'validateResourceUsage'
-  // assumes that ExecutorInfo is valid which is verified by
-  // 'validateExecutorInfo'.
   vector<lambda::function<Option<Error>()>> validators = {
-    lambda::bind(internal::validateTaskID, task),
-    lambda::bind(internal::validateUniqueTaskID, task, framework),
-    lambda::bind(internal::validateSlaveID, task, slave),
-    lambda::bind(internal::validateExecutorInfo, task, framework, slave),
-    lambda::bind(internal::validateResources, task),
-    lambda::bind(internal::validateKillPolicy, task),
-    lambda::bind(internal::validateHealthCheck, task),
-    lambda::bind(
-        internal::validateResourceUsage, task, framework, slave, offered)
+    lambda::bind(internal::validateTask, task, framework, slave),
+    lambda::bind(internal::validateExecutor, task, framework, slave, offered)
   };
 
-  // TODO(benh): Add a validateHealthCheck function.
-
-  // TODO(jieyu): Add a validateCommandInfo function.
-
   foreach (const lambda::function<Option<Error>()>& validator, validators) {
     Option<Error> error = validator();
     if (error.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/fb58d38d/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index fd00609..f1df645 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -75,6 +75,18 @@ Option<Error> validate(
 } // namespace resource {
 
 
+namespace executor {
+
+// Functions in this namespace are only exposed for testing.
+namespace internal {
+
+// Validates that fields are properly set depending on the type of the executor.
+Option<Error> validateType(const ExecutorInfo& executor);
+
+} // namespace internal {
+} // namespace executor {
+
+
 namespace task {
 
 // Validates a task that a framework attempts to launch within the
@@ -92,9 +104,12 @@ Option<Error> validate(
 // Functions in this namespace are only exposed for testing.
 namespace internal {
 
-// Validates resources of the task and executor (if present).
+// Validates resources of the task.
 Option<Error> validateResources(const TaskInfo& task);
 
+// Validates resources of the task and its executor.
+Option<Error> validateTaskAndExecutorResources(const TaskInfo& task);
+
 // Validates the kill policy of the task.
 Option<Error> validateKillPolicy(const TaskInfo& task);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/fb58d38d/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index ad89812..4800df1 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -702,7 +702,6 @@ TEST_F(TaskValidationTest, TaskUsesCommandInfoAndExecutorInfo)
 }
 
 
-// TODO(vinod): Revisit this test after `TaskGroup` validation is implemented.
 TEST_F(TaskValidationTest, TaskUsesExecutorInfoWithoutCommandInfo)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
@@ -719,6 +718,8 @@ TEST_F(TaskValidationTest, TaskUsesExecutorInfoWithoutCommandInfo)
   EXPECT_CALL(sched, registered(&driver, _, _));
 
   // Create an executor without command info.
+  // Note that we don't set type as 'CUSTOM' because it is not
+  // required for `LAUNCH` operation.
   ExecutorInfo executor;
   executor.mutable_executor_id()->set_value("default");
 
@@ -742,6 +743,49 @@ TEST_F(TaskValidationTest, TaskUsesExecutorInfoWithoutCommandInfo)
 }
 
 
+// This test verifies that a scheduler cannot explicitly specify
+// a 'DEFAULT' executor when using `LAUNCH` operation.
+// TODO(vinod): Revisit this when the above is allowed.
+TEST_F(TaskValidationTest, TaskUsesDefaultExecutor)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // Create a 'DEFAULT' executor.
+  ExecutorInfo executor;
+  executor.set_type(ExecutorInfo::DEFAULT);
+  executor.mutable_executor_id()->set_value("default");
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(executor, 1, 1, 16, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.start();
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_ERROR, status->state());
+  EXPECT_TRUE(strings::startsWith(
+      status->message(), "'ExecutorInfo.type' must be 'CUSTOM'"));
+
+  driver.stop();
+  driver.join();
+}
+
+
 TEST_F(TaskValidationTest, TaskUsesNoResources)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
@@ -842,7 +886,7 @@ TEST_F(TaskValidationTest, TaskUsesMoreResourcesThanOffered)
   EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
   EXPECT_TRUE(strings::contains(
-      status.get().message(), "Task uses more resources"));
+      status.get().message(), "more than available"));
 
   driver.stop();
   driver.join();
@@ -1013,7 +1057,7 @@ TEST_F(TaskValidationTest, ExecutorInfoDiffersOnSameSlave)
   EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
   EXPECT_TRUE(strings::contains(
-      status.get().message(), "Task has invalid ExecutorInfo"));
+      status.get().message(), "ExecutorInfo is not compatible"));
 
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));
@@ -1190,7 +1234,7 @@ TEST_F(TaskValidationTest, TaskAndExecutorUseRevocableResources)
   task.add_resources()->CopyFrom(cpus);
   executor.add_resources()->CopyFrom(cpus);
   task.mutable_executor()->CopyFrom(executor);
-  EXPECT_NONE(task::internal::validateResources(task));
+  EXPECT_NONE(task::internal::validateTaskAndExecutorResources(task));
 
   // Revocable cpus.
   Resource revocableCpus = cpus;
@@ -1202,7 +1246,7 @@ TEST_F(TaskValidationTest, TaskAndExecutorUseRevocableResources)
   executor.clear_resources();
   executor.add_resources()->CopyFrom(revocableCpus);
   task.mutable_executor()->CopyFrom(executor);
-  EXPECT_NONE(task::internal::validateResources(task));
+  EXPECT_NONE(task::internal::validateTaskAndExecutorResources(task));
 
   // A task with revocable cpus and its executor with non-revocable
   // cpus is invalid.
@@ -1211,7 +1255,7 @@ TEST_F(TaskValidationTest, TaskAndExecutorUseRevocableResources)
   executor.clear_resources();
   executor.add_resources()->CopyFrom(cpus);
   task.mutable_executor()->CopyFrom(executor);
-  EXPECT_SOME(task::internal::validateResources(task));
+  EXPECT_SOME(task::internal::validateTaskAndExecutorResources(task));
 
   // A task with non-revocable cpus and its executor with
   // non-revocable cpus is invalid.
@@ -1220,7 +1264,7 @@ TEST_F(TaskValidationTest, TaskAndExecutorUseRevocableResources)
   executor.clear_resources();
   executor.add_resources()->CopyFrom(revocableCpus);
   task.mutable_executor()->CopyFrom(executor);
-  EXPECT_SOME(task::internal::validateResources(task));
+  EXPECT_SOME(task::internal::validateTaskAndExecutorResources(task));
 }
 
 
@@ -1355,6 +1399,57 @@ TEST_F(TaskValidationTest, KillPolicyGracePeriodIsNonNegative)
 // TODO(benh): Add tests which launch multiple tasks and check for
 // aggregate resource usage.
 
+
+class ExecutorValidationTest : public MesosTest {};
+
+
+TEST_F(ExecutorValidationTest, ExecutorType)
+{
+  ExecutorInfo executorInfo;
+  executorInfo = DEFAULT_EXECUTOR_INFO;
+  executorInfo.mutable_framework_id()->set_value(UUID::random().toString());
+
+  {
+    // 'CUSTOM' executor with `CommandInfo` set is valid.
+    executorInfo.set_type(ExecutorInfo::CUSTOM);
+    executorInfo.mutable_command();
+
+    EXPECT_NONE(::executor::internal::validateType(executorInfo));
+  }
+
+  {
+    // 'CUSTOM' executor without `CommandInfo` set is invalid.
+    executorInfo.set_type(ExecutorInfo::CUSTOM);
+    executorInfo.clear_command();
+
+    Option<Error> error = ::executor::internal::validateType(executorInfo);
+    EXPECT_SOME(error);
+    EXPECT_TRUE(strings::contains(
+        error->message,
+        "'ExecutorInfo.command' must be set for 'CUSTOM' executor"));
+  }
+
+  {
+    // 'DEFAULT' executor without `CommandInfo` set is valid.
+    executorInfo.set_type(ExecutorInfo::DEFAULT);
+    executorInfo.clear_command();
+
+    EXPECT_NONE(::executor::internal::validateType(executorInfo));
+  }
+
+  {
+    // 'DEFAULT' executor with `CommandInfo` set is invalid.
+    executorInfo.set_type(ExecutorInfo::DEFAULT);
+    executorInfo.mutable_command();
+
+    Option<Error> error = ::executor::internal::validateType(executorInfo);
+    EXPECT_SOME(error);
+    EXPECT_TRUE(strings::contains(
+        error->message,
+        "'ExecutorInfo.command' must not be set for 'DEFAULT' executor"));
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {