You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/12/18 23:40:28 UTC

mesos git commit: Adjusted the calculation of unused resources in _launchTasks by considering persistent disk acquisition.

Repository: mesos
Updated Branches:
  refs/heads/master 31625aa79 -> 4c9813f2b


Adjusted the calculation of unused resources in _launchTasks by
considering persistent disk acquisition.

Review: https://reviews.apache.org/r/28720


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4c9813f2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4c9813f2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4c9813f2

Branch: refs/heads/master
Commit: 4c9813f2bc55e7dbf81b7b82641342fcc6ac73ad
Parents: 31625aa
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 16 15:51:48 2014 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 18 14:27:44 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp               | 111 +++++++++++++++++++++++++++----
 src/tests/resource_offers_tests.cpp | 101 +++++++++++++++++++++++++---
 2 files changed, 192 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4c9813f2/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0c98b51..a03469c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1905,7 +1905,7 @@ struct ResourceValidator : TaskInfoValidator
     // TODO(jieyu): The check we have right now is a partial check for
     // the current task. We need to add checks against slave's
     // existing tasks and executors as well.
-    hashset<string> persistenceIds;
+    hashmap<string, hashset<string>> persistenceIds;
 
     Option<Error> error = Resources::validate(task.resources());
     if (error.isSome()) {
@@ -1922,11 +1922,14 @@ struct ResourceValidator : TaskInfoValidator
         }
 
         if (resource.disk().has_persistence()) {
+          string role = resource.role();
           string id = resource.disk().persistence().id();
-          if (persistenceIds.contains(id)) {
+
+          if (persistenceIds[role].contains(id)) {
             return Error("Task uses duplicated persistence ID " + id);
           }
-          persistenceIds.insert(id);
+
+          persistenceIds[role].insert(id);
         }
       }
     }
@@ -1949,11 +1952,14 @@ struct ResourceValidator : TaskInfoValidator
           }
 
           if (resource.disk().has_persistence()) {
+            string role = resource.role();
             string id = resource.disk().persistence().id();
-            if (persistenceIds.contains(id)) {
+
+            if (persistenceIds[role].contains(id)) {
               return Error("Executor uses duplicated persistence ID " + id);
             }
-            persistenceIds.insert(id);
+
+            persistenceIds[role].insert(id);
           }
         }
       }
@@ -2061,10 +2067,60 @@ struct ResourceUsageValidator : TaskInfoValidator
       resources += executorResources;
     }
 
-    if (!offeredResources.contains(resources + usedResources)) {
+    // We allow frameworks to implicitly acquire persistent disks
+    // through task and executor resources. This means that we need to
+    // infer these implicit disk acquisition transformations on the
+    // offered resources, so that we can validate resource usage.
+    //
+    // NOTE: ResourceValidator ensures that there are no duplicate
+    // persistence IDs per role in 'resources'.
+    //
+    // NOTE: 'offeredResources' will not contain duplicate persistence
+    // IDs per role, given we do not construct such offers.
+    Resources::CompositeTransformation transformation;
+    foreach (const Resource& disk, resources.persistentDisks()) {
+      if (!offeredResources.contains(disk)) {
+        // This is an implicit acquisition. The framework is not
+        // allowed to mutate an offered persistent disk, so we need to
+        // check the offered resources for this persistence ID within
+        // the role.
+        //
+        // TODO(jieyu): We need to ensure this persistence ID within
+        // the role does not clash with any in-use persistent disks on
+        // the slave.
+        string id = disk.disk().persistence().id();
+        foreach (const Resource& offered, offeredResources.persistentDisks()) {
+          if (offered.role() == disk.role() &&
+              offered.disk().persistence().id() == id) {
+            return Error("Duplicated persistence ID '" + id + "'");
+          }
+        }
+
+        transformation.add(Resources::AcquirePersistentDisk(disk));
+      }
+    }
+
+    // Validate that the offered resources are sufficient for
+    // launching this task/executor. To do that, we must first apply
+    // the implicit transformations.
+    Try<Resources> transformedOfferedResources =
+      transformation(offeredResources);
+
+    if (transformedOfferedResources.isError()) {
+      // TODO(jieyu): Revisit this error message once we start to
+      // allow other types of transformations (e.g., dynamic
+      // reservations).
+      return Error(
+          "Failed to acquire persistent disks: " +
+          transformedOfferedResources.error());
+    }
+
+    if (!transformedOfferedResources.get()
+          .contains(resources + usedResources)) {
       return Error(
           "Task uses more resources " + stringify(resources) +
-          " than available " + stringify(offeredResources - usedResources));
+          " than available " +
+          stringify(transformedOfferedResources.get() - usedResources));
     }
 
     return None();
@@ -2634,7 +2690,14 @@ void Master::_launchTasks(
     return;
   }
 
-  Resources usedResources; // Accumulated resources used.
+
+  // We need to transform the offered resources by considering
+  // resources transformations like persistent disk acquisition.
+  Resources transformedOfferedResources = offeredResources;
+
+  // Accumulated resources used by launched tasks.
+  Resources usedResources;
+
   size_t index = 0;
   foreach (const Future<bool>& authorization, authorizations.get()) {
     const TaskInfo& task = tasks[index++];
@@ -2684,7 +2747,7 @@ void Master::_launchTasks(
         task,
         framework,
         slave,
-        offeredResources,
+        transformedOfferedResources,
         usedResources);
 
     if (validationError.isSome()) {
@@ -2709,8 +2772,32 @@ void Master::_launchTasks(
     if (pending) {
       usedResources += addTask(task, framework, slave);
 
-      // TODO(bmahler): Consider updating this log message to
-      // indicate when the executor is also being launched.
+      // We allow frameworks to implicitly acquire persistent disk
+      // through resources, meaning that they can transform the
+      // offered resources. We need to infer those acquisitions.
+      Resources::CompositeTransformation transformation;
+      foreach (const Resource& disk, usedResources.persistentDisks()) {
+        if (!transformedOfferedResources.contains(disk)) {
+          // NOTE: No need to check duplicated persistence ID because
+          // it should have been validated in ResourceUsageValidator.
+          transformation.add(Resources::AcquirePersistentDisk(disk));
+        }
+      }
+
+      // Adjust the total resources by applying the transformation.
+      Try<Resources> _transformedOfferedResources =
+        transformation(transformedOfferedResources);
+
+      // NOTE: The transformation should have also been validated in
+      // ResourceUsageValidator.
+      CHECK_SOME(_transformedOfferedResources);
+      transformedOfferedResources = _transformedOfferedResources.get();
+
+      // TODO(jieyu): Call 'allocator->transformAllocation(...)' here
+      // to update the allocator.
+
+      // TODO(bmahler): Consider updating this log message to indicate
+      // when the executor is also being launched.
       LOG(INFO) << "Launching task " << task.task_id()
                 << " of framework " << *framework
                 << " with resources " << task.resources()
@@ -2727,7 +2814,7 @@ void Master::_launchTasks(
   }
 
   // Calculate unused resources.
-  Resources unusedResources = offeredResources - usedResources;
+  Resources unusedResources = transformedOfferedResources - usedResources;
 
   if (!unusedResources.empty()) {
     // Tell the allocator about the unused (e.g., refused) resources.

http://git-wip-us.apache.org/repos/asf/mesos/blob/4c9813f2/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index e13b6c5..d098e70 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -256,7 +256,6 @@ TEST_F(TaskValidationTest, TaskUsesMoreResourcesThanOffered)
   EXPECT_EQ(TASK_ERROR, status.get().state());
   EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
-
   EXPECT_TRUE(strings::contains(
       status.get().message(), "Task uses more resources"));
 
@@ -583,7 +582,7 @@ TEST_F(TaskValidationTest, UnreservedDiskInfo)
   Resource diskResource = Resources::parse("disk", "128", "*").get();
   diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
 
-  // Include persistent disk resource in task resources.
+  // Include other resources in task resources.
   Resources taskResources =
     Resources::parse("cpus:1;mem:128").get() + diskResource;
 
@@ -645,7 +644,7 @@ TEST_F(TaskValidationTest, InvalidPersistenceID)
   Resource diskResource = Resources::parse("disk", "128", "role1").get();
   diskResource.mutable_disk()->CopyFrom(createDiskInfo("1/", "1"));
 
-  // Include persistent disk resource in task resources.
+  // Include other resources in task resources.
   Resources taskResources =
     Resources::parse("cpus:1;mem:128").get() + diskResource;
 
@@ -707,7 +706,7 @@ TEST_F(TaskValidationTest, PersistentDiskInfoWithoutVolume)
   Resource diskResource = Resources::parse("disk", "128", "role1").get();
   diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", None()));
 
-  // Include persistent disk resource in task resources.
+  // Include other resources in task resources.
   Resources taskResources =
     Resources::parse("cpus:1;mem:128").get() + diskResource;
 
@@ -769,7 +768,7 @@ TEST_F(TaskValidationTest, PersistentDiskInfoWithReadOnlyVolume)
   Resource diskResource = Resources::parse("disk", "128", "role1").get();
   diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", "1", Volume::RO));
 
-  // Include persistent disk resource in task resources.
+  // Include other resources in task resources.
   Resources taskResources =
     Resources::parse("cpus:1;mem:128").get() + diskResource;
 
@@ -832,7 +831,7 @@ TEST_F(TaskValidationTest, PersistentDiskInfoWithHostPath)
   diskResource.mutable_disk()->CopyFrom(
       createDiskInfo("1", "1", Volume::RW, "foo"));
 
-  // Include persistent disk resource in task resources.
+  // Include other resources in task resources.
   Resources taskResources =
     Resources::parse("cpus:1;mem:128").get() + diskResource;
 
@@ -894,7 +893,7 @@ TEST_F(TaskValidationTest, NonPersistentDiskInfoWithVolume)
   Resource diskResource = Resources::parse("disk", "128", "role1").get();
   diskResource.mutable_disk()->CopyFrom(createDiskInfo(None(), "1"));
 
-  // Include non-persistent disk resource in task resources.
+  // Include other resources in task resources.
   Resources taskResources =
     Resources::parse("cpus:1;mem:128").get() + diskResource;
 
@@ -959,7 +958,7 @@ TEST_F(TaskValidationTest, DuplicatedPersistenceIDWithinTask)
   Resource diskResource2 = Resources::parse("disk", "64", "role1").get();
   diskResource2.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
 
-  // Include non-persistent disk resource in task resources.
+  // Include other resources in task resources.
   Resources taskResources =
     Resources::parse("cpus:1;mem:128").get() + diskResource1 + diskResource2;
 
@@ -991,6 +990,92 @@ TEST_F(TaskValidationTest, DuplicatedPersistenceIDWithinTask)
   Shutdown();
 }
 
+
+// This test ensures that a persistent disk that is larger than the
+// offered disk resources results in a failed task.
+TEST_F(TaskValidationTest, AcquirePersistentDiskTooBig)
+{
+  // Create a framework with role "role1";
+  FrameworkInfo frameworkInfo;
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role1");
+
+  // Setup ACLs in order to receive offers for "role1".
+  ACLs acls;
+  mesos::ACL::RegisterFramework* acl = acls.add_register_frameworks();
+  acl->mutable_principals()->add_values(frameworkInfo.principal());
+  acl->mutable_roles()->add_values(frameworkInfo.role());
+
+  // Start master with ACLs.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.roles = frameworkInfo.role();
+  masterFlags.acls = acls;
+
+  Try<PID<Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus(*):4;mem(*):2048;disk(role1):1024";
+
+  Try<PID<Slave>> slave = StartSlave(slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create a persistent disk resource with volume whose size is
+  // larger than the size of the offered disk.
+  Resource diskResource = Resources::parse("disk", "2048", "role1").get();
+  diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
+
+  // Include other resources in task resources.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128").get() + diskResource;
+
+  Offer offer = offers.get()[0];
+  TaskInfo task =
+    createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_ERROR, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(), "Failed to acquire persistent disks"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+// TODO(jieyu): Add tests for checking duplicated persistence ID
+// against offered resources.
+
 // TODO(jieyu): Add tests for checking duplicated persistence ID
 // across task and executors.