You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/12/04 01:20:18 UTC

[1/4] mesos git commit: Splitted resource and resource usage checkers.

Repository: mesos
Updated Branches:
  refs/heads/master 325fccafc -> 22d1f608e


Splitted resource and resource usage checkers.

Review: https://reviews.apache.org/r/28618


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ac99a56
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ac99a56
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ac99a56

Branch: refs/heads/master
Commit: 7ac99a56cad3302c01a32b74d218281a34ee4bfe
Parents: 325fcca
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 2 16:50:27 2014 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 3 16:12:06 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 62 +++++++++++++++++++++++++++++-----------------
 1 file changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7ac99a56/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index b910665..c3465a6 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1889,13 +1889,8 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
 };
 
 
-// Checks that the used resources by a task on each slave does not
-// exceed the total resources offered on that slave.
-// NOTE: We do not account for executor resources here because tasks
-// are launched asynchronously and an executor might exit between
-// validation and actual launch. Therefore executor resources are
-// accounted for in 'Master::_launchTasks()'.
-struct ResourceUsageChecker : TaskInfoVisitor
+// Checks that resources specified by the framework are valid.
+struct ResourceChecker : TaskInfoVisitor
 {
   virtual Option<Error> operator () (
       const TaskInfo& task,
@@ -1904,18 +1899,11 @@ struct ResourceUsageChecker : TaskInfoVisitor
       const Resources& totalResources,
       const Resources& usedResources)
   {
-    if (task.resources().size() == 0) {
-      return Error("Task uses no resources");
-    }
-
     Option<Error> error = Resources::validate(task.resources());
     if (error.isSome()) {
       return Error("Task uses invalid resources: " + error.get().message);
     }
 
-    // Check this task's executor's resources.
-    Resources executorResources;
-
     if (task.has_executor()) {
       Option<Error> error = Resources::validate(task.executor().resources());
       if (error.isSome()) {
@@ -1923,11 +1911,39 @@ struct ResourceUsageChecker : TaskInfoVisitor
             "Executor for task " + stringify(task.task_id()) +
             " uses invalid resources: " + error.get().message);
       }
+    }
+
+    return None();
+  }
+};
+
 
+// Checks that the task and the executor are using proper amount of
+// resources. For instance, the used resources by a task on each slave
+// should not exceed the total resources offered on that slave.
+struct ResourceUsageChecker : TaskInfoVisitor
+{
+  virtual Option<Error> operator () (
+      const TaskInfo& task,
+      const Framework& framework,
+      const Slave& slave,
+      const Resources& totalResources,
+      const Resources& usedResources)
+  {
+    Resources taskResources = task.resources();
+
+    if (taskResources.empty()) {
+      return Error("Task uses no resources");
+    }
+
+    Resources executorResources;
+    if (task.has_executor()) {
       executorResources = task.executor().resources();
+    }
 
-      // Check minimal cpus and memory resources of executor and log
-      // warnings if not set.
+    // Check minimal cpus and memory resources of executor and log
+    // warnings if not set.
+    if (task.has_executor()) {
       // TODO(martin): MESOS-1807. Return Error instead of logging a
       // warning in 0.22.0.
       Option<double> cpus =  executorResources.cpus();
@@ -1957,10 +1973,9 @@ struct ResourceUsageChecker : TaskInfoVisitor
 
     // Check if resources needed by the task (and its executor in case
     // the executor is new) are available.
-    Resources resources = task.resources();
-
+    Resources resources = taskResources;
     if (!slave.hasExecutor(framework.id, task.executor().executor_id())) {
-      resources += task.executor().resources();
+      resources += executorResources;
     }
 
     if (!totalResources.contains(resources + usedResources)) {
@@ -2343,10 +2358,10 @@ Option<Error> Master::validateTask(
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(slave);
 
-  // Create task visitors. The order in which the following checkers
-  // are executed does matter! For example, ResourceUsageChecker
-  // assumes that ExecutorInfo is valid which is verified by
-  // ExecutorInfoChecker.
+  // Create task visitors.
+  // NOTE: The order in which the following checkers are executed does
+  // matter! For example, ResourceUsageChecker assumes that
+  // ExecutorInfo is valid which is verified by ExecutorInfoChecker.
   // TODO(vinod): Create the visitors on the stack and make the visit
   // operation const.
   list<Owned<TaskInfoVisitor>> taskVisitors;
@@ -2355,6 +2370,7 @@ Option<Error> Master::validateTask(
   taskVisitors.push_back(Owned<TaskInfoVisitor>(new UniqueTaskIDChecker()));
   taskVisitors.push_back(Owned<TaskInfoVisitor>(new CheckpointChecker()));
   taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceChecker()));
   taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker()));
 
   // TODO(benh): Add a HealthCheckChecker visitor.


[4/4] mesos git commit: Added duplicated persistence id check in ResourceChecker.

Posted by ji...@apache.org.
Added duplicated persistence id check in ResourceChecker.

Review: https://reviews.apache.org/r/28664


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/22d1f608
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/22d1f608
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/22d1f608

Branch: refs/heads/master
Commit: 22d1f608e842f0452ef243b49778ca82c2bfd7b0
Parents: 3f0f275
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Dec 3 11:25:41 2014 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 3 16:13:44 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp               | 22 ++++++++++
 src/tests/resource_offers_tests.cpp | 71 ++++++++++++++++++++++++++++++++
 2 files changed, 93 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/22d1f608/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index bf9d20f..3dc4e7a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1899,6 +1899,12 @@ struct ResourceChecker : TaskInfoVisitor
       const Resources& totalResources,
       const Resources& usedResources)
   {
+    // This is used to ensure no duplicated persistence id exists.
+    // TODO(jieyu): The check we have right now is a partial check for
+    // the current task. We need to add checks against slave's
+    // existing tasks and executors as well.
+    hashset<string> persistenceIds;
+
     Option<Error> error = Resources::validate(task.resources());
     if (error.isSome()) {
       return Error("Task uses invalid resources: " + error.get().message);
@@ -1912,6 +1918,14 @@ struct ResourceChecker : TaskInfoVisitor
         if (error.isSome()) {
           return Error("Task uses invalid DiskInfo: " + error.get().message);
         }
+
+        if (resource.disk().has_persistence()) {
+          string id = resource.disk().persistence().id();
+          if (persistenceIds.contains(id)) {
+            return Error("Task uses duplicated persistence ID " + id);
+          }
+          persistenceIds.insert(id);
+        }
       }
     }
 
@@ -1931,6 +1945,14 @@ struct ResourceChecker : TaskInfoVisitor
             return Error(
                 "Executor uses invalid DiskInfo: " + error.get().message);
           }
+
+          if (resource.disk().has_persistence()) {
+            string id = resource.disk().persistence().id();
+            if (persistenceIds.contains(id)) {
+              return Error("Executor uses duplicated persistence ID " + id);
+            }
+            persistenceIds.insert(id);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/22d1f608/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index 467c7e5..e13b6c5 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -926,6 +926,77 @@ TEST_F(TaskValidationTest, NonPersistentDiskInfoWithVolume)
   Shutdown();
 }
 
+
+TEST_F(TaskValidationTest, DuplicatedPersistenceIDWithinTask)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create two persistent disk resources with the same id.
+  Resource diskResource1 = Resources::parse("disk", "128", "role1").get();
+  diskResource1.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
+
+  Resource diskResource2 = Resources::parse("disk", "64", "role1").get();
+  diskResource2.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
+
+  // Include non-persistent disk resource in task resources.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128").get() + diskResource1 + diskResource2;
+
+  Offer offer = offers.get()[0];
+  TaskInfo task =
+    createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(),
+      "Task uses duplicated persistence ID 1"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+// TODO(jieyu): Add tests for checking duplicated persistence ID
+// across task and executors.
+
+// TODO(jieyu): Add tests for checking duplicated persistence ID
+// within an executor.
+
 // TODO(benh): Add tests for checking correct slave IDs.
 
 // TODO(benh): Add tests for checking executor resource usage.


[3/4] mesos git commit: Added tests for basic DiskInfo checker.

Posted by ji...@apache.org.
Added tests for basic DiskInfo checker.

Review: https://reviews.apache.org/r/28627


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f0f275a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f0f275a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f0f275a

Branch: refs/heads/master
Commit: 3f0f275ad1f0ac52c0927740b33eb4c52e2b964a
Parents: 34bb637
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 2 19:02:14 2014 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 3 16:12:07 2014 -0800

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp       |   7 +-
 src/common/protobuf_utils.hpp       |   3 +
 src/tests/mesos.hpp                 |  31 +++
 src/tests/resource_offers_tests.cpp | 377 +++++++++++++++++++++++++++++++
 4 files changed, 415 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3f0f275a/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index baf04a6..8ab5cdd 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -27,6 +27,8 @@
 
 #include "messages/messages.hpp"
 
+using std::string;
+
 namespace mesos {
 namespace internal {
 namespace protobuf {
@@ -48,7 +50,7 @@ StatusUpdate createStatusUpdate(
     const TaskID& taskId,
     const TaskState& state,
     const TaskStatus::Source& source,
-    const std::string& message = "",
+    const string& message = "",
     const Option<TaskStatus::Reason>& reason = None(),
     const Option<ExecutorID>& executorId = None())
 {
@@ -117,7 +119,7 @@ MasterInfo createMasterInfo(const process::UPID& pid)
   info.set_port(pid.node.port);
   info.set_pid(pid);
 
-  Try<std::string> hostname = net::getHostname(pid.node.ip);
+  Try<string> hostname = net::getHostname(pid.node.ip);
   if (hostname.isSome()) {
     info.set_hostname(hostname.get());
   }
@@ -125,7 +127,6 @@ MasterInfo createMasterInfo(const process::UPID& pid)
   return info;
 }
 
-
 } // namespace protobuf {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f0f275a/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index bc3ef2a..e42aaa5 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -36,6 +36,7 @@ namespace protobuf {
 
 bool isTerminalState(const TaskState& state);
 
+
 StatusUpdate createStatusUpdate(
     const FrameworkID& frameworkId,
     const Option<SlaveID>& slaveId,
@@ -46,11 +47,13 @@ StatusUpdate createStatusUpdate(
     const Option<TaskStatus::Reason>& reason = None(),
     const Option<ExecutorID>& executorId = None());
 
+
 Task createTask(
     const TaskInfo& task,
     const TaskState& state,
     const FrameworkID& frameworkId);
 
+
 // Helper function that creates a MasterInfo from UPID.
 MasterInfo createMasterInfo(const process::UPID& pid);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f0f275a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index aa10343..90c575e 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -359,6 +359,37 @@ inline TaskInfo createTask(
 }
 
 
+// NOTE: We only set the volume in DiskInfo if 'containerPath' is set.
+// If volume mode is not specified, Volume::RW will be used (assuming
+// 'containerPath' is set).
+inline Resource::DiskInfo createDiskInfo(
+    const Option<std::string>& persistenceId,
+    const Option<std::string>& containerPath,
+    const Option<Volume::Mode>& mode = None(),
+    const Option<std::string>& hostPath = None())
+{
+  Resource::DiskInfo info;
+
+  if (persistenceId.isSome()) {
+    info.mutable_persistence()->set_id(persistenceId.get());
+  }
+
+  if (containerPath.isSome()) {
+    Volume volume;
+    volume.set_container_path(containerPath.get());
+    volume.set_mode(mode.isSome() ? mode.get() : Volume::RW);
+
+    if (hostPath.isSome()) {
+      volume.set_host_path(hostPath.get());
+    }
+
+    info.mutable_volume()->CopyFrom(volume);
+  }
+
+  return info;
+}
+
+
 // Definition of a mock Scheduler to be used in tests with gmock.
 class MockScheduler : public Scheduler
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f0f275a/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index c4afc38..467c7e5 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -53,6 +53,11 @@ using testing::AtMost;
 using testing::Return;
 
 
+// TODO(jieyu): All of the task validation tests have the same flow:
+// launch a task, expect an update of a particular format (invalid w/
+// message). Consider providing common functionalities in the test
+// fixture to avoid code bloat. Ultimately, we should make task or
+// offer validation unit testable.
 class TaskValidationTest : public MesosTest {};
 
 
@@ -549,6 +554,378 @@ TEST_F(TaskValidationTest, ExecutorInfoDiffersOnDifferentSlaves)
 }
 
 
+TEST_F(TaskValidationTest, UnreservedDiskInfo)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create a persistent disk resource with "*" role.
+  Resource diskResource = Resources::parse("disk", "128", "*").get();
+  diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
+
+  // Include persistent disk resource in task resources.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128").get() + diskResource;
+
+  Offer offer = offers.get()[0];
+  TaskInfo task =
+    createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(),
+      "Persistent disk volume is disallowed for '*' role"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST_F(TaskValidationTest, InvalidPersistenceID)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create a persistent disk resource with an invalid persistence id.
+  Resource diskResource = Resources::parse("disk", "128", "role1").get();
+  diskResource.mutable_disk()->CopyFrom(createDiskInfo("1/", "1"));
+
+  // Include persistent disk resource in task resources.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128").get() + diskResource;
+
+  Offer offer = offers.get()[0];
+  TaskInfo task =
+    createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(),
+      "Persistence ID '1/' contains invalid characters"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST_F(TaskValidationTest, PersistentDiskInfoWithoutVolume)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create a persistent disk resource without a volume.
+  Resource diskResource = Resources::parse("disk", "128", "role1").get();
+  diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", None()));
+
+  // Include persistent disk resource in task resources.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128").get() + diskResource;
+
+  Offer offer = offers.get()[0];
+  TaskInfo task =
+    createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(),
+      "Persistent disk should specify a volume"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST_F(TaskValidationTest, PersistentDiskInfoWithReadOnlyVolume)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create a persistent disk resource with read-only volume.
+  Resource diskResource = Resources::parse("disk", "128", "role1").get();
+  diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", "1", Volume::RO));
+
+  // Include persistent disk resource in task resources.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128").get() + diskResource;
+
+  Offer offer = offers.get()[0];
+  TaskInfo task =
+    createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(),
+      "Read-only volume is not supported for DiskInfo"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST_F(TaskValidationTest, PersistentDiskInfoWithHostPath)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create a persistent disk resource with host path in the volume.
+  Resource diskResource = Resources::parse("disk", "128", "role1").get();
+  diskResource.mutable_disk()->CopyFrom(
+      createDiskInfo("1", "1", Volume::RW, "foo"));
+
+  // Include persistent disk resource in task resources.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128").get() + diskResource;
+
+  Offer offer = offers.get()[0];
+  TaskInfo task =
+    createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(),
+      "Volume in DiskInfo should not have 'host_path' set"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST_F(TaskValidationTest, NonPersistentDiskInfoWithVolume)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create a non-persistent disk resource with volume.
+  Resource diskResource = Resources::parse("disk", "128", "role1").get();
+  diskResource.mutable_disk()->CopyFrom(createDiskInfo(None(), "1"));
+
+  // Include non-persistent disk resource in task resources.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128").get() + diskResource;
+
+  Offer offer = offers.get()[0];
+  TaskInfo task =
+    createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(),
+      "Non-persistent disk volume is not supported"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 // TODO(benh): Add tests for checking correct slave IDs.
 
 // TODO(benh): Add tests for checking executor resource usage.


[2/4] mesos git commit: Added basic DiskInfo check in master.

Posted by ji...@apache.org.
Added basic DiskInfo check in master.

Review: https://reviews.apache.org/r/28626


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/34bb637b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/34bb637b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/34bb637b

Branch: refs/heads/master
Commit: 34bb637b2479cede846d8b1140756e06ebc2dafe
Parents: 7ac99a5
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 2 17:26:14 2014 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 3 16:12:07 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 63 ++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 61 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/34bb637b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c3465a6..bf9d20f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1904,17 +1904,76 @@ struct ResourceChecker : TaskInfoVisitor
       return Error("Task uses invalid resources: " + error.get().message);
     }
 
+    // Ensure any DiskInfos in the task are valid according to the
+    // currently supported semantics.
+    foreach (const Resource& resource, task.resources()) {
+      if (resource.has_disk()) {
+        error = validateDiskInfo(resource);
+        if (error.isSome()) {
+          return Error("Task uses invalid DiskInfo: " + error.get().message);
+        }
+      }
+    }
+
     if (task.has_executor()) {
       Option<Error> error = Resources::validate(task.executor().resources());
       if (error.isSome()) {
         return Error(
-            "Executor for task " + stringify(task.task_id()) +
-            " uses invalid resources: " + error.get().message);
+            "Executor uses invalid resources: " + error.get().message);
+      }
+
+      // Ensure any DiskInfos in the executor are valid according to
+      // the currently supported semantics.
+      foreach (const Resource& resource, task.executor().resources()) {
+        if (resource.has_disk()) {
+          error = validateDiskInfo(resource);
+          if (error.isSome()) {
+            return Error(
+                "Executor uses invalid DiskInfo: " + error.get().message);
+          }
+        }
       }
     }
 
     return None();
   }
+
+  Option<Error> validateDiskInfo(const Resource& resource)
+  {
+    CHECK(resource.has_disk());
+
+    if (resource.disk().has_persistence()) {
+      if (resource.role() == "*") {
+        return Error("Persistent disk volume is disallowed for '*' role");
+      }
+      if (!resource.disk().has_volume()) {
+        return Error("Persistent disk should specify a volume");
+      }
+      if (resource.disk().volume().mode() == Volume::RO) {
+        return Error("Read-only volume is not supported for DiskInfo");
+      }
+      if (resource.disk().volume().has_host_path()) {
+        return Error("Volume in DiskInfo should not have 'host_path' set");
+      }
+
+      // Ensure persistence ID does not have invalid characters.
+      string id = resource.disk().persistence().id();
+      if (std::count_if(id.begin(), id.end(), invalid) > 0) {
+        return Error("Persistence ID '" + id + "' contains invalid characters");
+      }
+    } else {
+      if (resource.disk().has_volume()) {
+        return Error("Non-persistent disk volume is not supported");
+      }
+    }
+
+    return None();
+  }
+
+  static bool invalid(char c)
+  {
+    return iscntrl(c) || c == '/' || c == '\\';
+  }
 };