You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/12/18 23:40:28 UTC
mesos git commit: Adjusted the calculation of unused resources in
_launchTasks by considering persistent disk acquisition.
Repository: mesos
Updated Branches:
refs/heads/master 31625aa79 -> 4c9813f2b
Adjusted the calculation of unused resources in _launchTasks by
considering persistent disk acquisition.
Review: https://reviews.apache.org/r/28720
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4c9813f2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4c9813f2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4c9813f2
Branch: refs/heads/master
Commit: 4c9813f2bc55e7dbf81b7b82641342fcc6ac73ad
Parents: 31625aa
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 16 15:51:48 2014 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 18 14:27:44 2014 -0800
----------------------------------------------------------------------
src/master/master.cpp | 111 +++++++++++++++++++++++++++----
src/tests/resource_offers_tests.cpp | 101 +++++++++++++++++++++++++---
2 files changed, 192 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4c9813f2/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0c98b51..a03469c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1905,7 +1905,7 @@ struct ResourceValidator : TaskInfoValidator
// TODO(jieyu): The check we have right now is a partial check for
// the current task. We need to add checks against slave's
// existing tasks and executors as well.
- hashset<string> persistenceIds;
+ hashmap<string, hashset<string>> persistenceIds;
Option<Error> error = Resources::validate(task.resources());
if (error.isSome()) {
@@ -1922,11 +1922,14 @@ struct ResourceValidator : TaskInfoValidator
}
if (resource.disk().has_persistence()) {
+ string role = resource.role();
string id = resource.disk().persistence().id();
- if (persistenceIds.contains(id)) {
+
+ if (persistenceIds[role].contains(id)) {
return Error("Task uses duplicated persistence ID " + id);
}
- persistenceIds.insert(id);
+
+ persistenceIds[role].insert(id);
}
}
}
@@ -1949,11 +1952,14 @@ struct ResourceValidator : TaskInfoValidator
}
if (resource.disk().has_persistence()) {
+ string role = resource.role();
string id = resource.disk().persistence().id();
- if (persistenceIds.contains(id)) {
+
+ if (persistenceIds[role].contains(id)) {
return Error("Executor uses duplicated persistence ID " + id);
}
- persistenceIds.insert(id);
+
+ persistenceIds[role].insert(id);
}
}
}
@@ -2061,10 +2067,60 @@ struct ResourceUsageValidator : TaskInfoValidator
resources += executorResources;
}
- if (!offeredResources.contains(resources + usedResources)) {
+ // We allow frameworks to implicitly acquire persistent disks
+ // through task and executor resources. This means that we need to
+ // infer these implicit disk acquisition transformations on the
+ // offered resources, so that we can validate resource usage.
+ //
+ // NOTE: ResourceValidator ensures that there are no duplicate
+ // persistence IDs per role in 'resources'.
+ //
+ // NOTE: 'offeredResources' will not contain duplicate persistence
+ // IDs per role, given we do not construct such offers.
+ Resources::CompositeTransformation transformation;
+ foreach (const Resource& disk, resources.persistentDisks()) {
+ if (!offeredResources.contains(disk)) {
+ // This is an implicit acquisition. The framework is not
+ // allowed to mutate an offered persistent disk, so we need to
+ // check the offered resources for this persistence ID within
+ // the role.
+ //
+ // TODO(jieyu): We need to ensure this persistence ID within
+ // the role does not clash with any in-use persistent disks on
+ // the slave.
+ string id = disk.disk().persistence().id();
+ foreach (const Resource& offered, offeredResources.persistentDisks()) {
+ if (offered.role() == disk.role() &&
+ offered.disk().persistence().id() == id) {
+ return Error("Duplicated persistence ID '" + id + "'");
+ }
+ }
+
+ transformation.add(Resources::AcquirePersistentDisk(disk));
+ }
+ }
+
+ // Validate that the offered resources are sufficient for
+ // launching this task/executor. To do that, we must first apply
+ // the implicit transformations.
+ Try<Resources> transformedOfferedResources =
+ transformation(offeredResources);
+
+ if (transformedOfferedResources.isError()) {
+ // TODO(jieyu): Revisit this error message once we start to
+ // allow other types of transformations (e.g., dynamic
+ // reservations).
+ return Error(
+ "Failed to acquire persistent disks: " +
+ transformedOfferedResources.error());
+ }
+
+ if (!transformedOfferedResources.get()
+ .contains(resources + usedResources)) {
return Error(
"Task uses more resources " + stringify(resources) +
- " than available " + stringify(offeredResources - usedResources));
+ " than available " +
+ stringify(transformedOfferedResources.get() - usedResources));
}
return None();
@@ -2634,7 +2690,14 @@ void Master::_launchTasks(
return;
}
- Resources usedResources; // Accumulated resources used.
+
+ // We need to transform the offered resources by considering
+ // resources transformations like persistent disk acquisition.
+ Resources transformedOfferedResources = offeredResources;
+
+ // Accumulated resources used by launched tasks.
+ Resources usedResources;
+
size_t index = 0;
foreach (const Future<bool>& authorization, authorizations.get()) {
const TaskInfo& task = tasks[index++];
@@ -2684,7 +2747,7 @@ void Master::_launchTasks(
task,
framework,
slave,
- offeredResources,
+ transformedOfferedResources,
usedResources);
if (validationError.isSome()) {
@@ -2709,8 +2772,32 @@ void Master::_launchTasks(
if (pending) {
usedResources += addTask(task, framework, slave);
- // TODO(bmahler): Consider updating this log message to
- // indicate when the executor is also being launched.
+ // We allow frameworks to implicitly acquire persistent disk
+ // through resources, meaning that they can transform the
+ // offered resources. We need to infer those acquisitions.
+ Resources::CompositeTransformation transformation;
+ foreach (const Resource& disk, usedResources.persistentDisks()) {
+ if (!transformedOfferedResources.contains(disk)) {
+ // NOTE: No need to check duplicated persistence ID because
+ // it should have been validated in ResourceUsageValidator.
+ transformation.add(Resources::AcquirePersistentDisk(disk));
+ }
+ }
+
+ // Adjust the total resources by applying the transformation.
+ Try<Resources> _transformedOfferedResources =
+ transformation(transformedOfferedResources);
+
+ // NOTE: The transformation should have also been validated in
+ // ResourceUsageValidator.
+ CHECK_SOME(_transformedOfferedResources);
+ transformedOfferedResources = _transformedOfferedResources.get();
+
+ // TODO(jieyu): Call 'allocator->transformAllocation(...)' here
+ // to update the allocator.
+
+ // TODO(bmahler): Consider updating this log message to indicate
+ // when the executor is also being launched.
LOG(INFO) << "Launching task " << task.task_id()
<< " of framework " << *framework
<< " with resources " << task.resources()
@@ -2727,7 +2814,7 @@ void Master::_launchTasks(
}
// Calculate unused resources.
- Resources unusedResources = offeredResources - usedResources;
+ Resources unusedResources = transformedOfferedResources - usedResources;
if (!unusedResources.empty()) {
// Tell the allocator about the unused (e.g., refused) resources.
http://git-wip-us.apache.org/repos/asf/mesos/blob/4c9813f2/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index e13b6c5..d098e70 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -256,7 +256,6 @@ TEST_F(TaskValidationTest, TaskUsesMoreResourcesThanOffered)
EXPECT_EQ(TASK_ERROR, status.get().state());
EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
EXPECT_TRUE(status.get().has_message());
-
EXPECT_TRUE(strings::contains(
status.get().message(), "Task uses more resources"));
@@ -583,7 +582,7 @@ TEST_F(TaskValidationTest, UnreservedDiskInfo)
Resource diskResource = Resources::parse("disk", "128", "*").get();
diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
- // Include persistent disk resource in task resources.
+ // Include other resources in task resources.
Resources taskResources =
Resources::parse("cpus:1;mem:128").get() + diskResource;
@@ -645,7 +644,7 @@ TEST_F(TaskValidationTest, InvalidPersistenceID)
Resource diskResource = Resources::parse("disk", "128", "role1").get();
diskResource.mutable_disk()->CopyFrom(createDiskInfo("1/", "1"));
- // Include persistent disk resource in task resources.
+ // Include other resources in task resources.
Resources taskResources =
Resources::parse("cpus:1;mem:128").get() + diskResource;
@@ -707,7 +706,7 @@ TEST_F(TaskValidationTest, PersistentDiskInfoWithoutVolume)
Resource diskResource = Resources::parse("disk", "128", "role1").get();
diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", None()));
- // Include persistent disk resource in task resources.
+ // Include other resources in task resources.
Resources taskResources =
Resources::parse("cpus:1;mem:128").get() + diskResource;
@@ -769,7 +768,7 @@ TEST_F(TaskValidationTest, PersistentDiskInfoWithReadOnlyVolume)
Resource diskResource = Resources::parse("disk", "128", "role1").get();
diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", "1", Volume::RO));
- // Include persistent disk resource in task resources.
+ // Include other resources in task resources.
Resources taskResources =
Resources::parse("cpus:1;mem:128").get() + diskResource;
@@ -832,7 +831,7 @@ TEST_F(TaskValidationTest, PersistentDiskInfoWithHostPath)
diskResource.mutable_disk()->CopyFrom(
createDiskInfo("1", "1", Volume::RW, "foo"));
- // Include persistent disk resource in task resources.
+ // Include other resources in task resources.
Resources taskResources =
Resources::parse("cpus:1;mem:128").get() + diskResource;
@@ -894,7 +893,7 @@ TEST_F(TaskValidationTest, NonPersistentDiskInfoWithVolume)
Resource diskResource = Resources::parse("disk", "128", "role1").get();
diskResource.mutable_disk()->CopyFrom(createDiskInfo(None(), "1"));
- // Include non-persistent disk resource in task resources.
+ // Include other resources in task resources.
Resources taskResources =
Resources::parse("cpus:1;mem:128").get() + diskResource;
@@ -959,7 +958,7 @@ TEST_F(TaskValidationTest, DuplicatedPersistenceIDWithinTask)
Resource diskResource2 = Resources::parse("disk", "64", "role1").get();
diskResource2.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
- // Include non-persistent disk resource in task resources.
+ // Include other resources in task resources.
Resources taskResources =
Resources::parse("cpus:1;mem:128").get() + diskResource1 + diskResource2;
@@ -991,6 +990,92 @@ TEST_F(TaskValidationTest, DuplicatedPersistenceIDWithinTask)
Shutdown();
}
+
+// This test ensures that a persistent disk that is larger than the
+// offered disk resources results in a failed task.
+TEST_F(TaskValidationTest, AcquirePersistentDiskTooBig)
+{
+ // Create a framework with role "role1";
+ FrameworkInfo frameworkInfo;
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role1");
+
+ // Setup ACLs in order to receive offers for "role1".
+ ACLs acls;
+ mesos::ACL::RegisterFramework* acl = acls.add_register_frameworks();
+ acl->mutable_principals()->add_values(frameworkInfo.principal());
+ acl->mutable_roles()->add_values(frameworkInfo.role());
+
+ // Start master with ACLs.
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.roles = frameworkInfo.role();
+ masterFlags.acls = acls;
+
+ Try<PID<Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.resources = "cpus(*):4;mem(*):2048;disk(role1):1024";
+
+ Try<PID<Slave>> slave = StartSlave(slaveFlags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // Create a persistent disk resource with volume whose size is
+ // larger than the size of the offered disk.
+ Resource diskResource = Resources::parse("disk", "2048", "role1").get();
+ diskResource.mutable_disk()->CopyFrom(createDiskInfo("1", "1"));
+
+ // Include other resources in task resources.
+ Resources taskResources =
+ Resources::parse("cpus:1;mem:128").get() + diskResource;
+
+ Offer offer = offers.get()[0];
+ TaskInfo task =
+ createTask(offer.slave_id(), taskResources, "", DEFAULT_EXECUTOR_ID);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.launchTasks(offer.id(), tasks);
+
+ AWAIT_READY(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_ERROR, status.get().state());
+ EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
+ EXPECT_TRUE(status.get().has_message());
+ EXPECT_TRUE(strings::contains(
+ status.get().message(), "Failed to acquire persistent disks"));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+// TODO(jieyu): Add tests for checking duplicated persistence ID
+// against offered resources.
+
// TODO(jieyu): Add tests for checking duplicated persistence ID
// across task and executors.