You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/08/23 03:06:27 UTC
[1/3] mesos git commit: Added master validations for TaskGroup.
Repository: mesos
Updated Branches:
refs/heads/master 4000fd31b -> 06c0e3e6e
Added master validations for TaskGroup.
The TaskGroup is considered invalid if any of the tasks or the executor
is invalid.
Review: https://reviews.apache.org/r/51270
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/06c0e3e6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/06c0e3e6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/06c0e3e6
Branch: refs/heads/master
Commit: 06c0e3e6ecd7fa526ba69886340d41882dbdf0a9
Parents: fb58d38
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Aug 15 16:10:04 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Aug 22 20:04:04 2016 -0700
----------------------------------------------------------------------
src/master/validation.cpp | 168 +++++++++++++++++++++++++++++
src/master/validation.hpp | 35 +++++-
src/tests/master_validation_tests.cpp | 155 ++++++++++++++++++++++++++
3 files changed, 357 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/06c0e3e6/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 803aa48..30319e7 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -972,6 +972,174 @@ Option<Error> validate(
return None();
}
+namespace group {
+
+namespace internal {
+
+Option<Error> validateTask(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+
+ // Do the general validation first.
+ Option<Error> error = task::internal::validateTask(task, framework, slave);
+ if (error.isSome()) {
+ return error;
+ }
+
+ // Now do `TaskGroup` specific validation.
+ if (task.has_executor()) {
+ return Error("'TaskInfo.executor' must not be set");
+ }
+
+ return None();
+}
+
+
+Option<Error> validateTaskGroupAndExecutorResources(
+ const TaskGroupInfo& taskGroup,
+ const ExecutorInfo& executor)
+{
+ Resources total = executor.resources();
+ foreach (const TaskInfo& task, taskGroup.tasks()) {
+ total += task.resources();
+ }
+
+ Option<Error> error = resource::validateUniquePersistenceID(total);
+ if (error.isSome()) {
+ return Error("Task group and executor use duplicate persistence ID: " +
+ error->message);
+ }
+
+ error = resource::validateRevocableAndNonRevocableResources(total);
+ if (error.isSome()) {
+ return Error("Task group and executor mix revocable and non-revocable"
+ " resources: " + error->message);
+ }
+
+ return None();
+}
+
+
+Option<Error> validateExecutor(
+ const TaskGroupInfo& taskGroup,
+ const ExecutorInfo& executor,
+ Framework* framework,
+ Slave* slave,
+ const Resources& offered)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+
+ // Do the general validation first.
+ Option<Error> error =
+ executor::internal::validate(executor, framework, slave);
+
+ if (error.isSome()) {
+ return error;
+ }
+
+ // Now do `TaskGroup` specific validation.
+
+ if (!executor.has_type()) {
+ return Error("'ExecutorInfo.type' must be set");
+ }
+
+
+ if (executor.type() == ExecutorInfo::UNKNOWN) {
+ return Error("Unknown executor type");
+ }
+
+ const Resources& executorResources = executor.resources();
+
+ // Validate minimal cpus and memory resources of executor.
+ Option<double> cpus = executorResources.cpus();
+ if (cpus.isNone() || cpus.get() < MIN_CPUS) {
+ return Error(
+ "Executor '" + stringify(executor.executor_id()) +
+ "' uses less CPUs (" +
+ (cpus.isSome() ? stringify(cpus.get()) : "None") +
+ ") than the minimum required (" + stringify(MIN_CPUS) + ")");
+ }
+
+ Option<Bytes> mem = executorResources.mem();
+ if (mem.isNone() || mem.get() < MIN_MEM) {
+ return Error(
+ "Executor '" + stringify(executor.executor_id()) +
+ "' uses less memory (" +
+ (mem.isSome() ? stringify(mem.get().megabytes()) : "None") +
+ ") than the minimum required (" + stringify(MIN_MEM) + ")");
+ }
+
+ Option<Bytes> disk = executorResources.disk();
+ if (disk.isNone()) {
+ return Error(
+ "Executor '" + stringify(executor.executor_id()) + "' uses no disk");
+ }
+
+ // Validate combined resources of task group and executor.
+
+ // NOTE: This is refactored into a separate function so that it can
+ // be easily unit tested.
+ error = internal::validateTaskGroupAndExecutorResources(taskGroup, executor);
+ if (error.isSome()) {
+ return error;
+ }
+
+ Resources total;
+ foreach (const TaskInfo& task, taskGroup.tasks()) {
+ total += task.resources();
+ }
+
+ if (!slave->hasExecutor(framework->id(), executor.executor_id())) {
+ total += executorResources;
+ }
+
+ if (!offered.contains(total)) {
+ return Error(
+ "Total resources " + stringify(total) + " required by task group and"
+ " its executor are more than available " + stringify(offered));
+ }
+
+ return None();
+}
+
+} // namespace internal {
+
+
+Option<Error> validate(
+ const TaskGroupInfo& taskGroup,
+ const ExecutorInfo& executor,
+ Framework* framework,
+ Slave* slave,
+ const Resources& offered)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+
+ foreach (const TaskInfo& task, taskGroup.tasks()) {
+ Option<Error> error = internal::validateTask(task, framework, slave);
+ if (error.isSome()) {
+ return Error("Task '" + stringify(task.task_id()) + "' is invalid: " +
+ error->message);
+ }
+ }
+
+ Option<Error> error =
+ internal::validateExecutor(taskGroup, executor, framework, slave, offered);
+
+ if (error.isSome()) {
+ return error;
+ }
+
+ return None();
+}
+
+} // namespace group {
+
} // namespace task {
http://git-wip-us.apache.org/repos/asf/mesos/blob/06c0e3e6/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index f1df645..3205ee1 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -91,7 +91,8 @@ namespace task {
// Validates a task that a framework attempts to launch within the
// offered resources. Returns an optional error which will cause the
-// master to send a failed status update back to the framework.
+// master to send a `TASK_ERROR` status update back to the framework.
+//
// NOTE: This function must be called sequentially for each task, and
// each task needs to be launched before the next can be validated.
Option<Error> validate(
@@ -118,6 +119,38 @@ Option<Error> validateHealthCheck(const TaskInfo& task);
} // namespace internal {
+namespace group {
+
+// Validates a task group that a framework attempts to launch within the
+// offered resources. Returns an optional error which will cause the
+// master to send a `TASK_ERROR` status updates for *all* the tasks in
+// the task group back to the framework.
+//
+// NOTE: Validation error of *any* task will cause all the tasks in the task
+// group to be rejected by the master.
+Option<Error> validate(
+ const TaskGroupInfo& taskGroup,
+ const ExecutorInfo& executor,
+ Framework* framework,
+ Slave* slave,
+ const Resources& offered);
+
+
+// Functions in this namespace are only exposed for testing.
+namespace internal {
+
+// Validates that the resources specified by
+// the task group and its executor are valid.
+//
+// TODO(vinod): Consolidate this with `validateTaskAndExecutorResources()`.
+Option<Error> validateTaskGroupAndExecutorResources(
+ const TaskGroupInfo& taskGroup,
+ const ExecutorInfo& executor);
+
+} // namespace internal {
+
+} // namespace group {
+
} // namespace task {
http://git-wip-us.apache.org/repos/asf/mesos/blob/06c0e3e6/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index 4800df1..e1a5030 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1450,6 +1450,161 @@ TEST_F(ExecutorValidationTest, ExecutorType)
}
}
+
+class TaskGroupValidationTest : public MesosTest {};
+
+
+// This test verifies that tasks in a task group cannot mix
+// revocable and non-revocable resources.
+TEST_F(TaskGroupValidationTest, TaskGroupUsesRevocableResources)
+{
+ TaskInfo task1;
+ task1.set_name("test1");
+ task1.mutable_task_id()->set_value("task1");
+ task1.mutable_slave_id()->set_value("slave");
+
+ TaskInfo task2;
+ task2.set_name("test2");
+ task2.mutable_task_id()->set_value("task2");
+ task2.mutable_slave_id()->set_value("slave");
+
+ ExecutorInfo executor = DEFAULT_EXECUTOR_INFO;
+
+ // Non-revocable cpus.
+ Resource cpus;
+ cpus.set_name("cpus");
+ cpus.set_type(Value::SCALAR);
+ cpus.mutable_scalar()->set_value(2);
+
+ // A task group with only non-revocable cpus is valid.
+ task1.add_resources()->CopyFrom(cpus);
+ task2.add_resources()->CopyFrom(cpus);
+
+ TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(task1);
+ taskGroup.add_tasks()->CopyFrom(task2);
+
+ EXPECT_NONE(task::group::internal::validateTaskGroupAndExecutorResources(
+ taskGroup, executor));
+
+ // Revocable cpus.
+ Resource revocableCpus = cpus;
+ revocableCpus.mutable_revocable();
+
+ // A task group with only revocable cpus is valid.
+ task1.clear_resources();
+ task2.clear_resources();
+ task1.add_resources()->CopyFrom(revocableCpus);
+ task2.add_resources()->CopyFrom(revocableCpus);
+
+ taskGroup.clear_tasks();
+ taskGroup.add_tasks()->CopyFrom(task1);
+ taskGroup.add_tasks()->CopyFrom(task2);
+
+ EXPECT_NONE(task::group::internal::validateTaskGroupAndExecutorResources(
+ taskGroup, executor));
+
+ // A task group with one task using revocable resources and another task
+ // using non-revocable cpus is invalid.
+ task1.clear_resources();
+ task2.clear_resources();
+ task1.add_resources()->CopyFrom(cpus);
+ task2.add_resources()->CopyFrom(revocableCpus);
+
+ taskGroup.clear_tasks();
+ taskGroup.add_tasks()->CopyFrom(task1);
+ taskGroup.add_tasks()->CopyFrom(task2);
+
+ EXPECT_SOME(task::group::internal::validateTaskGroupAndExecutorResources(
+ taskGroup, executor));
+}
+
+
+// This test verifies that tasks in a task group and executor
+// cannot mix revocable and non-revocable resources.
+TEST_F(TaskGroupValidationTest, TaskGroupAndExecutorUsesRevocableResources)
+{
+ TaskInfo task;
+ task.set_name("test1");
+ task.mutable_task_id()->set_value("task1");
+ task.mutable_slave_id()->set_value("slave");
+
+ ExecutorInfo executor = DEFAULT_EXECUTOR_INFO;
+
+ // Non-revocable cpus.
+ Resource cpus;
+ cpus.set_name("cpus");
+ cpus.set_type(Value::SCALAR);
+ cpus.mutable_scalar()->set_value(2);
+
+ // A task group and executor with only non-revocable cpus is valid.
+ task.add_resources()->CopyFrom(cpus);
+
+ TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(task);
+
+ executor.add_resources()->CopyFrom(cpus);
+
+ EXPECT_NONE(task::group::internal::validateTaskGroupAndExecutorResources(
+ taskGroup, executor));
+
+ // Revocable cpus.
+ Resource revocableCpus = cpus;
+ revocableCpus.mutable_revocable();
+
+ // A task group and executor with only revocable cpus is valid.
+ task.clear_resources();
+ task.add_resources()->CopyFrom(revocableCpus);
+
+ taskGroup.clear_tasks();
+ taskGroup.add_tasks()->CopyFrom(task);
+
+ executor.clear_resources();
+ executor.add_resources()->CopyFrom(revocableCpus);
+
+ EXPECT_NONE(task::group::internal::validateTaskGroupAndExecutorResources(
+ taskGroup, executor));
+
+ // A task group with the task using revocable resources and executor
+ // using non-revocable cpus is invalid.
+ task.clear_resources();
+ task.add_resources()->CopyFrom(revocableCpus);
+
+ taskGroup.clear_tasks();
+ taskGroup.add_tasks()->CopyFrom(task);
+
+ executor.clear_resources();
+ executor.add_resources()->CopyFrom(cpus);
+
+ Option<Error> error =
+ task::group::internal::validateTaskGroupAndExecutorResources(
+ taskGroup, executor);
+
+ EXPECT_SOME(error);
+ EXPECT_TRUE(strings::contains(
+ error->message,
+ "Task group and executor mix revocable and non-revocable resources"));
+
+ // A task group with the task using non-revocable resources and executor
+ // using revocable cpus is invalid.
+ task.clear_resources();
+ task.add_resources()->CopyFrom(cpus);
+
+ taskGroup.clear_tasks();
+ taskGroup.add_tasks()->CopyFrom(task);
+
+ executor.clear_resources();
+ executor.add_resources()->CopyFrom(revocableCpus);
+
+ error = task::group::internal::validateTaskGroupAndExecutorResources(
+ taskGroup, executor);
+
+ EXPECT_SOME(error);
+ EXPECT_TRUE(strings::contains(
+ error->message,
+ "Task group and executor mix revocable and non-revocable resources"));
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[3/3] mesos git commit: Fixed indentation in master/validation.cpp.
Posted by vi...@apache.org.
Fixed indentation in master/validation.cpp.
Review: https://reviews.apache.org/r/51247
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47f2c218
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47f2c218
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47f2c218
Branch: refs/heads/master
Commit: 47f2c2187bffe79d9d05085f403b9087154d6618
Parents: 4000fd3
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Aug 15 16:12:26 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Aug 22 20:04:04 2016 -0700
----------------------------------------------------------------------
src/master/validation.cpp | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/47f2c218/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index ddc7ac3..19a63ea 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -534,8 +534,7 @@ Option<Error> validateTaskID(const TaskInfo& task)
// Validates that the TaskID does not collide with any existing tasks
// for the framework.
-Option<Error> validateUniqueTaskID(const TaskInfo& task, Framework*
- framework)
+Option<Error> validateUniqueTaskID(const TaskInfo& task, Framework* framework)
{
const TaskID& taskId = task.task_id();
@@ -563,7 +562,9 @@ Option<Error> validateSlaveID(const TaskInfo& task, Slave* slave)
// Validates that tasks that use the "same" executor (i.e., same
// ExecutorID) have an identical ExecutorInfo.
Option<Error> validateExecutorInfo(
- const TaskInfo& task, Framework* framework, Slave* slave)
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave)
{
if (task.has_executor() == task.has_command()) {
return Error(
@@ -940,7 +941,8 @@ Option<Error> validateFramework(
// Validates that all offers belong to the same valid slave.
Option<Error> validateSlave(
- const RepeatedPtrField<OfferID>& offerIds, Master* master)
+ const RepeatedPtrField<OfferID>& offerIds,
+ Master* master)
{
Option<SlaveID> slaveId;
[2/3] mesos git commit: Refactored task validations in master.
Posted by vi...@apache.org.
Refactored task validations in master.
Refactored in such a way that most of the helper functions can be
reused for doing task group validation.
Note that there are no functional changes here only code movement.
Review: https://reviews.apache.org/r/51248
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fb58d38d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fb58d38d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fb58d38d
Branch: refs/heads/master
Commit: fb58d38d86b63804448455bcf2e356d75554e5e1
Parents: 47f2c21
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Aug 16 19:22:34 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Aug 22 20:04:04 2016 -0700
----------------------------------------------------------------------
src/master/validation.cpp | 475 +++++++++++++++++++----------
src/master/validation.hpp | 17 +-
src/tests/master_validation_tests.cpp | 109 ++++++-
3 files changed, 438 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fb58d38d/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 19a63ea..803aa48 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -470,6 +470,25 @@ Option<Error> validateUniquePersistenceID(
}
+// Validates that revocable and non-revocable
+// resources of the same name do not exist.
+//
+// TODO(vinod): Is this the right place to do this?
+Option<Error> validateRevocableAndNonRevocableResources(
+ const Resources& _resources)
+{
+ foreach (const string& name, _resources.names()) {
+ Resources resources = _resources.get(name);
+ if (!resources.revocable().empty() && resources != resources.revocable()) {
+ return Error("Cannot use both revocable and non-revocable '" + name +
+ "' at the same time");
+ }
+ }
+
+ return None();
+}
+
+
// Validates that all the given resources are persistent volumes.
Option<Error> validatePersistentVolume(
const RepeatedPtrField<Resource>& volumes)
@@ -514,6 +533,161 @@ Option<Error> validate(const RepeatedPtrField<Resource>& resources)
} // namespace resource {
+namespace executor {
+namespace internal {
+
+Option<Error> validateType(const ExecutorInfo& executor)
+{
+ switch (executor.type()) {
+ case ExecutorInfo::DEFAULT:
+ if (executor.has_command()) {
+ return Error(
+ "'ExecutorInfo.command' must not be set for 'DEFAULT' executor");
+ }
+ break;
+
+ case ExecutorInfo::CUSTOM:
+ if (!executor.has_command()) {
+ return Error(
+ "'ExecutorInfo.command' must be set for 'CUSTOM' executor");
+ }
+ break;
+
+ case ExecutorInfo::UNKNOWN:
+ // This could happen if a new executor type is introduced in the
+ // protos but the master doesn't know about it yet (e.g., new
+ // scheduler launches new type of executor on an old master).
+ return None();
+ }
+
+ return None();
+}
+
+
+Option<Error> validateCompatibleExecutorInfo(
+ const ExecutorInfo& executor,
+ Framework* framework,
+ Slave* slave)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+
+ const ExecutorID& executorId = executor.executor_id();
+ Option<ExecutorInfo> executorInfo = None();
+
+ if (slave->hasExecutor(framework->id(), executorId)) {
+ executorInfo =
+ slave->executors.at(framework->id()).at(executorId);
+ }
+
+ if (executorInfo.isSome() && !(executor == executorInfo.get())) {
+ return Error(
+ "ExecutorInfo is not compatible with existing ExecutorInfo"
+ " with same ExecutorID).\n"
+ "------------------------------------------------------------\n"
+ "Existing ExecutorInfo:\n" +
+ stringify(executorInfo.get()) + "\n"
+ "------------------------------------------------------------\n"
+ "ExecutorInfo:\n" +
+ stringify(executor) + "\n"
+ "------------------------------------------------------------\n");
+ }
+
+ return None();
+}
+
+
+Option<Error> validateFrameworkID(
+ const ExecutorInfo& executor,
+ Framework* framework)
+{
+ CHECK_NOTNULL(framework);
+
+ // Master ensures `ExecutorInfo.framework_id`
+ // is set before calling this method.
+ CHECK(executor.has_framework_id());
+
+ if (executor.framework_id() != framework->id()) {
+ return Error(
+ "ExecutorInfo has an invalid FrameworkID"
+ " (Actual: " + stringify(executor.framework_id()) +
+ " vs Expected: " + stringify(framework->id()) + ")");
+ }
+
+ return None();
+}
+
+
+Option<Error> validateShutdownGracePeriod(const ExecutorInfo& executor)
+{
+ // Make sure provided duration is non-negative.
+ if (executor.has_shutdown_grace_period() &&
+ Nanoseconds(executor.shutdown_grace_period().nanoseconds()) <
+ Duration::zero()) {
+ return Error(
+ "ExecutorInfo's 'shutdown_grace_period' must be non-negative");
+ }
+
+ return None();
+}
+
+
+Option<Error> validateResources(const ExecutorInfo& executor)
+{
+ Option<Error> error = resource::validate(executor.resources());
+ if (error.isSome()) {
+ return Error("Executor uses invalid resources: " + error->message);
+ }
+
+ const Resources& resources = executor.resources();
+
+ error = resource::validateUniquePersistenceID(resources);
+ if (error.isSome()) {
+ return Error(
+ "Executor uses duplicate persistence ID: " + error->message);
+ }
+
+ error = resource::validateRevocableAndNonRevocableResources(resources);
+ if (error.isSome()) {
+ return Error("Executor mixes revocable and non-revocable resources: " +
+ error->message);
+ }
+
+ return None();
+}
+
+
+Option<Error> validate(
+ const ExecutorInfo& executor,
+ Framework* framework,
+ Slave* slave)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+
+ vector<lambda::function<Option<Error>()>> validators = {
+ lambda::bind(internal::validateType, executor),
+ lambda::bind(internal::validateFrameworkID, executor, framework),
+ lambda::bind(internal::validateShutdownGracePeriod, executor),
+ lambda::bind(internal::validateResources, executor),
+ lambda::bind(
+ internal::validateCompatibleExecutorInfo, executor, framework, slave)
+ };
+
+ foreach (const lambda::function<Option<Error>()>& validator, validators) {
+ Option<Error> error = validator();
+ if (error.isSome()) {
+ return error;
+ }
+ }
+
+ return None();
+}
+
+} // namespace internal {
+} // namespace executor {
+
+
namespace task {
namespace internal {
@@ -559,73 +733,115 @@ Option<Error> validateSlaveID(const TaskInfo& task, Slave* slave)
}
-// Validates that tasks that use the "same" executor (i.e., same
-// ExecutorID) have an identical ExecutorInfo.
-Option<Error> validateExecutorInfo(
- const TaskInfo& task,
- Framework* framework,
- Slave* slave)
+Option<Error> validateKillPolicy(const TaskInfo& task)
{
- if (task.has_executor() == task.has_command()) {
- return Error(
- "Task should have at least one (but not both) of CommandInfo or "
- "ExecutorInfo present");
+ if (task.has_kill_policy() &&
+ task.kill_policy().has_grace_period() &&
+ Nanoseconds(task.kill_policy().grace_period().nanoseconds()) <
+ Duration::zero()) {
+ return Error("Task's 'kill_policy.grace_period' must be non-negative");
}
- if (task.has_executor()) {
- // Master ensures `ExecutorInfo.framework_id` is set before calling
- // this method.
- CHECK(task.executor().has_framework_id());
+ return None();
+}
- if (task.executor().framework_id() != framework->id()) {
- return Error(
- "ExecutorInfo has an invalid FrameworkID"
- " (Actual: " + stringify(task.executor().framework_id()) +
- " vs Expected: " + stringify(framework->id()) + ")");
+
+Option<Error> validateHealthCheck(const TaskInfo& task)
+{
+ if (task.has_health_check()) {
+ Option<Error> error = health::validation::healthCheck(task.health_check());
+ if (error.isSome()) {
+ return Error("Task uses invalid health check: " + error->message);
}
+ }
+ return None();
+}
- // TODO(vinod): Revisit these when `TaskGroup` validation is added
- // (MESOS-6042).
- if (task.executor().has_type() &&
- task.executor().type() != ExecutorInfo::CUSTOM) {
- return Error("'ExecutorInfo.type' must be 'CUSTOM'");
- }
+Option<Error> validateResources(const TaskInfo& task)
+{
+ if (task.resources().empty()) {
+ return Error("Task uses no resources");
+ }
- // While `ExecutorInfo.command` is optional in the protobuf,
- // semantically it is still required for backwards compatibility.
- if (!task.executor().has_command()) {
- return Error("'ExecutorInfo.command' must be set");
- }
+ Option<Error> error = resource::validate(task.resources());
+ if (error.isSome()) {
+ return Error("Task uses invalid resources: " + error->message);
+ }
- const ExecutorID& executorId = task.executor().executor_id();
- Option<ExecutorInfo> executorInfo = None();
+ const Resources& resources = task.resources();
- if (slave->hasExecutor(framework->id(), executorId)) {
- executorInfo =
- slave->executors.get(framework->id()).get().get(executorId);
- }
+ error = resource::validateUniquePersistenceID(resources);
+ if (error.isSome()) {
+ return Error("Task uses duplicate persistence ID: " + error->message);
+ }
- if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
- return Error(
- "Task has invalid ExecutorInfo (existing ExecutorInfo"
- " with same ExecutorID is not compatible).\n"
- "------------------------------------------------------------\n"
- "Existing ExecutorInfo:\n" +
- stringify(executorInfo.get()) + "\n"
- "------------------------------------------------------------\n"
- "Task's ExecutorInfo:\n" +
- stringify(task.executor()) + "\n"
- "------------------------------------------------------------\n");
- }
+ error = resource::validateRevocableAndNonRevocableResources(resources);
+ if (error.isSome()) {
+ return Error("Task mixes revocable and non-revocable resources: " +
+ error->message);
+ }
- // Make sure provided duration is non-negative.
- if (task.executor().has_shutdown_grace_period() &&
- Nanoseconds(task.executor().shutdown_grace_period().nanoseconds()) <
- Duration::zero()) {
- return Error(
- "ExecutorInfo's 'shutdown_grace_period' must be non-negative");
+ return None();
+}
+
+
+Option<Error> validateTaskAndExecutorResources(const TaskInfo& task)
+{
+ Resources total = task.resources();
+ if (task.has_executor()) {
+ total += task.executor().resources();
+ }
+
+ Option<Error> error = resource::validate(total);
+ if (error.isSome()) {
+ return Error(
+ "Task and its executor use invalid resources: " + error->message);
+ }
+
+ error = resource::validateUniquePersistenceID(total);
+ if (error.isSome()) {
+ return Error("Task and its executor use duplicate persistence ID: " +
+ error->message);
+ }
+
+ error = resource::validateRevocableAndNonRevocableResources(total);
+ if (error.isSome()) {
+ return Error("Task and its executor mix revocable and non-revocable"
+ " resources: " + error->message);
+ }
+
+ return None();
+}
+
+
+// Validates task specific fields except its executor (if it exists).
+Option<Error> validateTask(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+
+ // NOTE: The order in which the following validate functions are
+ // executed does matter!
+ vector<lambda::function<Option<Error>()>> validators = {
+ lambda::bind(internal::validateTaskID, task),
+ lambda::bind(internal::validateUniqueTaskID, task, framework),
+ lambda::bind(internal::validateSlaveID, task, slave),
+ lambda::bind(internal::validateKillPolicy, task),
+ lambda::bind(internal::validateHealthCheck, task),
+ lambda::bind(internal::validateResources, task)
+ };
+
+ // TODO(jieyu): Add a validateCommandInfo function.
+
+ foreach (const lambda::function<Option<Error>()>& validator, validators) {
+ Option<Error> error = validator();
+ if (error.isSome()) {
+ return error;
}
}
@@ -633,31 +849,54 @@ Option<Error> validateExecutorInfo(
}
-// Validates that the task and the executor are using proper amount of
-// resources. For instance, the used resources by a task on a slave
-// should not exceed the total resources offered on that slave.
-Option<Error> validateResourceUsage(
+// Validates `Task.executor` if it exists.
+Option<Error> validateExecutor(
const TaskInfo& task,
Framework* framework,
Slave* slave,
const Resources& offered)
{
- Resources taskResources = task.resources();
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
- if (taskResources.empty()) {
- return Error("Task uses no resources");
+ if (task.has_executor() == task.has_command()) {
+ return Error(
+ "Task should have at least one (but not both) of CommandInfo or "
+ "ExecutorInfo present");
}
- Resources executorResources;
- if (task.has_executor()) {
- executorResources = task.executor().resources();
- }
+ Resources total = task.resources();
+
+ Option<Error> error = None();
- // Validate minimal cpus and memory resources of executor and log
- // warnings if not set.
if (task.has_executor()) {
+ const ExecutorInfo& executor = task.executor();
+
+ // Do the general validation first.
+ error = executor::internal::validate(executor, framework, slave);
+ if (error.isSome()) {
+ return error;
+ }
+
+ // Now do specific validation when an executor is specified on `Task`.
+
+ // TODO(vinod): Revisit this when we allow schedulers to explicitly
+ // specify 'DEFAULT' executor in the `LAUNCH` operation.
+
+ if (executor.has_type() && executor.type() != ExecutorInfo::CUSTOM) {
+ return Error("'ExecutorInfo.type' must be 'CUSTOM'");
+ }
+
+ // While `ExecutorInfo.command` is optional in the protobuf,
+ // semantically it is still required for backwards compatibility.
+ if (!executor.has_command()) {
+ return Error("'ExecutorInfo.command' must be set");
+ }
+
// TODO(martin): MESOS-1807. Return Error instead of logging a
- // warning in 0.22.0.
+ // warning.
+ const Resources& executorResources = executor.resources();
+
Option<double> cpus = executorResources.cpus();
if (cpus.isNone() || cpus.get() < MIN_CPUS) {
LOG(WARNING)
@@ -681,85 +920,25 @@ Option<Error> validateResourceUsage(
<< "). Please update your executor, as this will be mandatory "
<< "in future releases.";
}
- }
-
- // Validate if resources needed by the task (and its executor in
- // case the executor is new) are available.
- Resources total = taskResources;
- if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) {
- total += executorResources;
- }
-
- if (!offered.contains(total)) {
- return Error(
- "Task uses more resources " + stringify(total) +
- " than available " + stringify(offered));
- }
-
- return None();
-}
-
-
-// Validates that the resources specified by the task are valid.
-Option<Error> validateResources(const TaskInfo& task)
-{
- Option<Error> error = resource::validate(task.resources());
- if (error.isSome()) {
- return Error("Task uses invalid resources: " + error.get().message);
- }
-
- Resources total = task.resources();
- if (task.has_executor()) {
- error = resource::validate(task.executor().resources());
- if (error.isSome()) {
- return Error("Executor uses invalid resources: " + error.get().message);
+ if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) {
+ total += executorResources;
}
-
- total += task.executor().resources();
}
- // A task and its executor can either use non-revocable resources
- // or revocable resources of a given name but not both.
- foreach (const string& name, total.names()) {
- Resources resources = total.get(name);
- if (!resources.revocable().empty() &&
- resources != resources.revocable()) {
- return Error("Task (and its executor, if exists) uses both revocable and"
- " non-revocable " + name);
- }
- }
+ // Now validate combined resources of task and executor.
- error = resource::validateUniquePersistenceID(total);
+ // NOTE: This is refactored into a separate function
+ // so that it can be easily unit tested.
+ error = task::internal::validateTaskAndExecutorResources(task);
if (error.isSome()) {
return error;
}
- return None();
-}
-
-
-Option<Error> validateKillPolicy(const TaskInfo& task)
-{
- if (task.has_kill_policy() &&
- task.kill_policy().has_grace_period() &&
- Nanoseconds(task.kill_policy().grace_period().nanoseconds()) <
- Duration::zero()) {
- return Error("Task's 'kill_policy.grace_period' must be non-negative");
- }
-
- return None();
-}
-
-
-Option<Error> validateHealthCheck(const TaskInfo& task)
-{
- if (task.has_health_check()) {
- Option<Error> error = health::validation::healthCheck(task.health_check());
-
- if (error.isSome()) {
- return Error("Task uses invalid health check: " + error.get().message);
- }
+ if (!offered.contains(total)) {
+ return Error(
+ "Total resources " + stringify(total) + " required by task and its"
+ " executor is more than available " + stringify(offered));
}
return None();
@@ -768,6 +947,7 @@ Option<Error> validateHealthCheck(const TaskInfo& task)
} // namespace internal {
+// Validate task and its executor (if it exists).
Option<Error> validate(
const TaskInfo& task,
Framework* framework,
@@ -777,26 +957,11 @@ Option<Error> validate(
CHECK_NOTNULL(framework);
CHECK_NOTNULL(slave);
- // NOTE: The order in which the following validate functions are
- // executed does matter! For example, 'validateResourceUsage'
- // assumes that ExecutorInfo is valid which is verified by
- // 'validateExecutorInfo'.
vector<lambda::function<Option<Error>()>> validators = {
- lambda::bind(internal::validateTaskID, task),
- lambda::bind(internal::validateUniqueTaskID, task, framework),
- lambda::bind(internal::validateSlaveID, task, slave),
- lambda::bind(internal::validateExecutorInfo, task, framework, slave),
- lambda::bind(internal::validateResources, task),
- lambda::bind(internal::validateKillPolicy, task),
- lambda::bind(internal::validateHealthCheck, task),
- lambda::bind(
- internal::validateResourceUsage, task, framework, slave, offered)
+ lambda::bind(internal::validateTask, task, framework, slave),
+ lambda::bind(internal::validateExecutor, task, framework, slave, offered)
};
- // TODO(benh): Add a validateHealthCheck function.
-
- // TODO(jieyu): Add a validateCommandInfo function.
-
foreach (const lambda::function<Option<Error>()>& validator, validators) {
Option<Error> error = validator();
if (error.isSome()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/fb58d38d/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index fd00609..f1df645 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -75,6 +75,18 @@ Option<Error> validate(
} // namespace resource {
+namespace executor {
+
+// Functions in this namespace are only exposed for testing.
+namespace internal {
+
+// Validates that fields are properly set depending on the type of the executor.
+Option<Error> validateType(const ExecutorInfo& executor);
+
+} // namespace internal {
+} // namespace executor {
+
+
namespace task {
// Validates a task that a framework attempts to launch within the
@@ -92,9 +104,12 @@ Option<Error> validate(
// Functions in this namespace are only exposed for testing.
namespace internal {
-// Validates resources of the task and executor (if present).
+// Validates resources of the task.
Option<Error> validateResources(const TaskInfo& task);
+// Validates resources of the task and its executor.
+Option<Error> validateTaskAndExecutorResources(const TaskInfo& task);
+
// Validates the kill policy of the task.
Option<Error> validateKillPolicy(const TaskInfo& task);
http://git-wip-us.apache.org/repos/asf/mesos/blob/fb58d38d/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index ad89812..4800df1 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -702,7 +702,6 @@ TEST_F(TaskValidationTest, TaskUsesCommandInfoAndExecutorInfo)
}
-// TODO(vinod): Revisit this test after `TaskGroup` validation is implemented.
TEST_F(TaskValidationTest, TaskUsesExecutorInfoWithoutCommandInfo)
{
Try<Owned<cluster::Master>> master = StartMaster();
@@ -719,6 +718,8 @@ TEST_F(TaskValidationTest, TaskUsesExecutorInfoWithoutCommandInfo)
EXPECT_CALL(sched, registered(&driver, _, _));
// Create an executor without command info.
+ // Note that we don't set type as 'CUSTOM' because it is not
+ // required for `LAUNCH` operation.
ExecutorInfo executor;
executor.mutable_executor_id()->set_value("default");
@@ -742,6 +743,49 @@ TEST_F(TaskValidationTest, TaskUsesExecutorInfoWithoutCommandInfo)
}
+// This test verifies that a scheduler cannot explicitly specify
+// a 'DEFAULT' executor when using `LAUNCH` operation.
+// TODO(vinod): Revisit this when the above is allowed.
+TEST_F(TaskValidationTest, TaskUsesDefaultExecutor)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // Create a 'DEFAULT' executor.
+ ExecutorInfo executor;
+ executor.set_type(ExecutorInfo::DEFAULT);
+ executor.mutable_executor_id()->set_value("default");
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(LaunchTasks(executor, 1, 1, 16, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.start();
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_ERROR, status->state());
+ EXPECT_TRUE(strings::startsWith(
+ status->message(), "'ExecutorInfo.type' must be 'CUSTOM'"));
+
+ driver.stop();
+ driver.join();
+}
+
+
TEST_F(TaskValidationTest, TaskUsesNoResources)
{
Try<Owned<cluster::Master>> master = StartMaster();
@@ -842,7 +886,7 @@ TEST_F(TaskValidationTest, TaskUsesMoreResourcesThanOffered)
EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
EXPECT_TRUE(status.get().has_message());
EXPECT_TRUE(strings::contains(
- status.get().message(), "Task uses more resources"));
+ status.get().message(), "more than available"));
driver.stop();
driver.join();
@@ -1013,7 +1057,7 @@ TEST_F(TaskValidationTest, ExecutorInfoDiffersOnSameSlave)
EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
EXPECT_TRUE(status.get().has_message());
EXPECT_TRUE(strings::contains(
- status.get().message(), "Task has invalid ExecutorInfo"));
+ status.get().message(), "ExecutorInfo is not compatible"));
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
@@ -1190,7 +1234,7 @@ TEST_F(TaskValidationTest, TaskAndExecutorUseRevocableResources)
task.add_resources()->CopyFrom(cpus);
executor.add_resources()->CopyFrom(cpus);
task.mutable_executor()->CopyFrom(executor);
- EXPECT_NONE(task::internal::validateResources(task));
+ EXPECT_NONE(task::internal::validateTaskAndExecutorResources(task));
// Revocable cpus.
Resource revocableCpus = cpus;
@@ -1202,7 +1246,7 @@ TEST_F(TaskValidationTest, TaskAndExecutorUseRevocableResources)
executor.clear_resources();
executor.add_resources()->CopyFrom(revocableCpus);
task.mutable_executor()->CopyFrom(executor);
- EXPECT_NONE(task::internal::validateResources(task));
+ EXPECT_NONE(task::internal::validateTaskAndExecutorResources(task));
// A task with revocable cpus and its executor with non-revocable
// cpus is invalid.
@@ -1211,7 +1255,7 @@ TEST_F(TaskValidationTest, TaskAndExecutorUseRevocableResources)
executor.clear_resources();
executor.add_resources()->CopyFrom(cpus);
task.mutable_executor()->CopyFrom(executor);
- EXPECT_SOME(task::internal::validateResources(task));
+ EXPECT_SOME(task::internal::validateTaskAndExecutorResources(task));
// A task with non-revocable cpus and its executor with
// non-revocable cpus is invalid.
@@ -1220,7 +1264,7 @@ TEST_F(TaskValidationTest, TaskAndExecutorUseRevocableResources)
executor.clear_resources();
executor.add_resources()->CopyFrom(revocableCpus);
task.mutable_executor()->CopyFrom(executor);
- EXPECT_SOME(task::internal::validateResources(task));
+ EXPECT_SOME(task::internal::validateTaskAndExecutorResources(task));
}
@@ -1355,6 +1399,57 @@ TEST_F(TaskValidationTest, KillPolicyGracePeriodIsNonNegative)
// TODO(benh): Add tests which launch multiple tasks and check for
// aggregate resource usage.
+
+class ExecutorValidationTest : public MesosTest {};
+
+
+TEST_F(ExecutorValidationTest, ExecutorType)
+{
+ ExecutorInfo executorInfo;
+ executorInfo = DEFAULT_EXECUTOR_INFO;
+ executorInfo.mutable_framework_id()->set_value(UUID::random().toString());
+
+ {
+ // 'CUSTOM' executor with `CommandInfo` set is valid.
+ executorInfo.set_type(ExecutorInfo::CUSTOM);
+ executorInfo.mutable_command();
+
+ EXPECT_NONE(::executor::internal::validateType(executorInfo));
+ }
+
+ {
+ // 'CUSTOM' executor without `CommandInfo` set is invalid.
+ executorInfo.set_type(ExecutorInfo::CUSTOM);
+ executorInfo.clear_command();
+
+ Option<Error> error = ::executor::internal::validateType(executorInfo);
+ EXPECT_SOME(error);
+ EXPECT_TRUE(strings::contains(
+ error->message,
+ "'ExecutorInfo.command' must be set for 'CUSTOM' executor"));
+ }
+
+ {
+ // 'DEFAULT' executor without `CommandInfo` set is valid.
+ executorInfo.set_type(ExecutorInfo::DEFAULT);
+ executorInfo.clear_command();
+
+ EXPECT_NONE(::executor::internal::validateType(executorInfo));
+ }
+
+ {
+ // 'DEFAULT' executor with `CommandInfo` set is invalid.
+ executorInfo.set_type(ExecutorInfo::DEFAULT);
+ executorInfo.mutable_command();
+
+ Option<Error> error = ::executor::internal::validateType(executorInfo);
+ EXPECT_SOME(error);
+ EXPECT_TRUE(strings::contains(
+ error->message,
+ "'ExecutorInfo.command' must not be set for 'DEFAULT' executor"));
+ }
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {