You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/05/03 09:03:14 UTC
[1/2] mesos git commit: Metrics for used resources should incorporate
shared resources.
Repository: mesos
Updated Branches:
refs/heads/master 5a299634e -> b5f595f7e
Metrics for used resources should incorporate shared resources.
The following metrics now take shared resources into account (i.e.,
the quantity of a share resource is not affected by its shared count)
in determination of the actual amount of used resources:
a) `master/<resource-name>_used`
b) `master/<resource-name>_revocable_used`
c) `slave/<resource-name>_used`
d) `slave/<resource-name>_revocable_used`
Review: https://reviews.apache.org/r/57963/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/25e3b141
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/25e3b141
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/25e3b141
Branch: refs/heads/master
Commit: 25e3b1412b51b0991da93b93b853e4f48cf73160
Parents: 5a29963
Author: Anindya Sinha <an...@apple.com>
Authored: Wed May 3 01:45:43 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed May 3 02:00:18 2017 -0700
----------------------------------------------------------------------
src/master/master.cpp | 26 +++--
src/slave/slave.cpp | 24 ++--
src/tests/persistent_volume_tests.cpp | 180 +++++++++++++++++++++++++++++
3 files changed, 206 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/25e3b141/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 31a7a2f..87c647b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -9079,13 +9079,16 @@ double Master::_resources_used(const string& name)
double used = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
+ // We use `Resources` arithmetic to accummulate the resources since the
+ // `+=` operator de-duplicates the same shared resources across frameworks.
+ Resources slaveUsed;
+
foreachvalue (const Resources& resources, slave->usedResources) {
- foreach (const Resource& resource, resources.nonRevocable()) {
- if (resource.name() == name && resource.type() == Value::SCALAR) {
- used += resource.scalar().value();
- }
- }
+ slaveUsed += resources.nonRevocable();
}
+
+ used +=
+ slaveUsed.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
}
return used;
@@ -9125,13 +9128,16 @@ double Master::_resources_revocable_used(const string& name)
double used = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
+ // We use `Resources` arithmetic to accummulate the resources since the
+ // `+=` operator de-duplicates the same shared resources across frameworks.
+ Resources slaveUsed;
+
foreachvalue (const Resources& resources, slave->usedResources) {
- foreach (const Resource& resource, resources.revocable()) {
- if (resource.name() == name && resource.type() == Value::SCALAR) {
- used += resource.scalar().value();
- }
- }
+ slaveUsed += resources.revocable();
}
+
+ used +=
+ slaveUsed.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
}
return used;
http://git-wip-us.apache.org/repos/asf/mesos/blob/25e3b141/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8b8078d..284f872 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -6740,19 +6740,17 @@ double Slave::_resources_total(const string& name)
double Slave::_resources_used(const string& name)
{
- double used = 0.0;
+ // We use `Resources` arithmetic to accummulate the resources since the
+ // `+=` operator de-duplicates the same shared resources across executors.
+ Resources used;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
- foreach (const Resource& resource, executor->resources.nonRevocable()) {
- if (resource.name() == name && resource.type() == Value::SCALAR) {
- used += resource.scalar().value();
- }
- }
+ used += executor->resources.nonRevocable();
}
}
- return used;
+ return used.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
}
@@ -6786,19 +6784,17 @@ double Slave::_resources_revocable_total(const string& name)
double Slave::_resources_revocable_used(const string& name)
{
- double used = 0.0;
+ // We use `Resources` arithmetic to accummulate the resources since the
+ // `+=` operator de-duplicates the same shared resources across executors.
+ Resources used;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
- foreach (const Resource& resource, executor->resources.revocable()) {
- if (resource.name() == name && resource.type() == Value::SCALAR) {
- used += resource.scalar().value();
- }
- }
+ used += executor->resources.revocable();
}
}
- return used;
+ return used.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/25e3b141/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index d42fd18..d6a48af 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -1187,6 +1187,186 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeRescindOnDestroy)
}
+// This test verifies that multiple frameworks belonging to the same role
+// can use the same shared persistent volume to launch tasks simultaneously.
+// It also verifies that metrics for used resources are populated on the
+// master and the agent.
+TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
+{
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+
+ slaveFlags.resources = getSlaveResources();
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // 1. Create framework1 so that all resources are offered to this framework.
+ FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo1.set_role(DEFAULT_TEST_ROLE);
+ frameworkInfo1.add_capabilities()->set_type(
+ FrameworkInfo::Capability::SHARED_RESOURCES);
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(
+ &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched1, registered(&driver1, _, _));
+
+ Future<vector<Offer>> offers1;
+ EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+ .WillOnce(FutureArg<1>(&offers1))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver1.start();
+
+ Clock::advance(masterFlags.allocation_interval);
+
+ AWAIT_READY(offers1);
+ EXPECT_FALSE(offers1->empty());
+
+ Offer offer1 = offers1.get()[0];
+
+ // 2. framework1 CREATEs a shared volume, and LAUNCHes a task with a subset
+ // of resources from the offer.
+ Resource volume = createPersistentVolume(
+ getDiskResource(Megabytes(2048)),
+ "id1",
+ "path1",
+ None(),
+ frameworkInfo1.principal(),
+ true); // Shared volume.
+
+ // Create a task which uses a portion of the offered resources, so that
+ // the remaining resources can be offered to framework2.
+ TaskInfo task1 = createTask(
+ offer1.slave_id(),
+ Resources::parse("cpus:1;mem:128").get() + volume,
+ "echo abc > path1/file1 && sleep 1000");
+
+ // We should receive a TASK_RUNNING for the launched task.
+ Future<TaskStatus> status1;
+
+ EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&status1));
+
+ // We use a filter of 0 seconds so the resources will be available
+ // in the next allocation cycle.
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ driver1.acceptOffers(
+ {offer1.id()},
+ {CREATE(volume),
+ LAUNCH({task1})},
+ filters);
+
+ AWAIT_READY(status1);
+ EXPECT_EQ(TASK_RUNNING, status1->state());
+
+ // Collect metrics based on framework1.
+ JSON::Object stats1 = Metrics();
+ ASSERT_EQ(1u, stats1.values.count("master/cpus_used"));
+ ASSERT_EQ(1u, stats1.values.count("master/mem_used"));
+ ASSERT_EQ(1u, stats1.values.count("master/disk_used"));
+ ASSERT_EQ(1u, stats1.values.count("master/disk_revocable_used"));
+ EXPECT_EQ(1, stats1.values["master/cpus_used"]);
+ EXPECT_EQ(128, stats1.values["master/mem_used"]);
+ EXPECT_EQ(2048, stats1.values["master/disk_used"]);
+ EXPECT_EQ(0, stats1.values["master/disk_revocable_used"]);
+ ASSERT_EQ(1u, stats1.values.count("slave/cpus_used"));
+ ASSERT_EQ(1u, stats1.values.count("slave/mem_used"));
+ ASSERT_EQ(1u, stats1.values.count("slave/disk_used"));
+ ASSERT_EQ(1u, stats1.values.count("slave/disk_revocable_used"));
+ EXPECT_EQ(2048, stats1.values["slave/disk_used"]);
+ EXPECT_EQ(0, stats1.values["slave/disk_revocable_used"]);
+
+ // 3. Create framework2 of the same role. It would be offered resources
+ // recovered from the framework1 call.
+ FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo2.set_role(DEFAULT_TEST_ROLE);
+ frameworkInfo2.add_capabilities()->set_type(
+ FrameworkInfo::Capability::SHARED_RESOURCES);
+
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(
+ &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched2, registered(&driver2, _, _));
+
+ Future<vector<Offer>> offers2;
+ EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+ .WillOnce(FutureArg<1>(&offers2))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver2.start();
+
+ AWAIT_READY(offers2);
+ EXPECT_FALSE(offers2->empty());
+
+ Offer offer2 = offers2.get()[0];
+
+ EXPECT_TRUE(Resources(offer2.resources()).contains(
+ allocatedResources(volume, frameworkInfo2.role())));
+
+ // 4. framework2 LAUNCHes a task with a subset of resources from the offer.
+
+ // Create a task `task2` which uses the same shared volume as `task1`.
+ TaskInfo task2 = createTask(
+ offer2.slave_id(),
+ Resources::parse("cpus:1;mem:256").get() + volume,
+ "echo abc > path1/file2 && sleep 1000");
+
+ // We should receive a TASK_RUNNING for the launched task.
+ Future<TaskStatus> status2;
+
+ EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+ .WillOnce(FutureArg<1>(&status2));
+
+ driver2.acceptOffers(
+ {offer2.id()},
+ {LAUNCH({task2})},
+ filters);
+
+ AWAIT_READY(status2);
+ EXPECT_EQ(TASK_RUNNING, status2->state());
+
+ // Collect metrics based on both frameworks. Note that the `cpus_used` and
+ // `mem_used` is updated, but `disk_used` does not change since both tasks
+ // use the same shared volume.
+ JSON::Object stats2 = Metrics();
+ ASSERT_EQ(1u, stats2.values.count("master/cpus_used"));
+ ASSERT_EQ(1u, stats2.values.count("master/mem_used"));
+ ASSERT_EQ(1u, stats2.values.count("master/disk_used"));
+ ASSERT_EQ(1u, stats2.values.count("master/disk_revocable_used"));
+ EXPECT_EQ(2, stats2.values["master/cpus_used"]);
+ EXPECT_EQ(384, stats2.values["master/mem_used"]);
+ EXPECT_EQ(2048, stats2.values["master/disk_used"]);
+ EXPECT_EQ(0, stats2.values["master/disk_revocable_used"]);
+ ASSERT_EQ(1u, stats2.values.count("slave/cpus_used"));
+ ASSERT_EQ(1u, stats2.values.count("slave/mem_used"));
+ ASSERT_EQ(1u, stats2.values.count("slave/disk_used"));
+ ASSERT_EQ(1u, stats2.values.count("slave/disk_revocable_used"));
+ EXPECT_EQ(2048, stats2.values["slave/disk_used"]);
+ EXPECT_EQ(0, stats2.values["slave/disk_revocable_used"]);
+
+ // Resume the clock so the terminating task and executor can be reaped.
+ Clock::resume();
+
+ driver1.stop();
+ driver1.join();
+
+ driver2.stop();
+ driver2.join();
+}
+
+
// This test verifies that the master recovers after a failover and
// re-offers the shared persistent volume when tasks using the same
// volume are still running.
[2/2] mesos git commit: Added a test to verify metrics when shared
resources are present.
Posted by ya...@apache.org.
Added a test to verify metrics when shared resources are present.
Review: https://reviews.apache.org/r/57964/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b5f595f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b5f595f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b5f595f7
Branch: refs/heads/master
Commit: b5f595f7e7f0941fd0ddc5edb37b2c06062de253
Parents: 25e3b14
Author: Anindya Sinha <an...@apple.com>
Authored: Wed May 3 01:52:33 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed May 3 02:02:05 2017 -0700
----------------------------------------------------------------------
src/tests/persistent_volume_tests.cpp | 180 +++++++++++++++++++++++++++++
1 file changed, 180 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b5f595f7/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index d6a48af..984f1b7 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -1189,6 +1189,186 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeRescindOnDestroy)
// This test verifies that multiple frameworks belonging to the same role
// can use the same shared persistent volume to launch tasks simultaneously.
+// It also verifies that metrics for used resources are correctly populated
+// on the master and the agent.
+TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
+{
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+
+ slaveFlags.resources = getSlaveResources();
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // 1. Create framework1 so that all resources are offered to this framework.
+ FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo1.set_role(DEFAULT_TEST_ROLE);
+ frameworkInfo1.add_capabilities()->set_type(
+ FrameworkInfo::Capability::SHARED_RESOURCES);
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(
+ &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched1, registered(&driver1, _, _));
+
+ Future<vector<Offer>> offers1;
+ EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+ .WillOnce(FutureArg<1>(&offers1))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver1.start();
+
+ Clock::advance(masterFlags.allocation_interval);
+
+ AWAIT_READY(offers1);
+ EXPECT_FALSE(offers1->empty());
+
+ Offer offer1 = offers1.get()[0];
+
+ // 2. framework1 CREATEs a shared volume, and LAUNCHes a task with a subset
+ // of resources from the offer.
+ Resource volume = createPersistentVolume(
+ getDiskResource(Megabytes(2048)),
+ "id1",
+ "path1",
+ None(),
+ frameworkInfo1.principal(),
+ true); // Shared volume.
+
+ // Create a task which uses a portion of the offered resources, so that
+ // the remaining resources can be offered to framework2.
+ TaskInfo task1 = createTask(
+ offer1.slave_id(),
+ Resources::parse("cpus:1;mem:128").get() + volume,
+ "echo abc > path1/file1 && sleep 1000");
+
+ // We should receive a TASK_RUNNING for the launched task.
+ Future<TaskStatus> status1;
+
+ EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&status1));
+
+ // We use a filter of 0 seconds so the resources will be available
+ // in the next allocation cycle.
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ driver1.acceptOffers(
+ {offer1.id()},
+ {CREATE(volume),
+ LAUNCH({task1})},
+ filters);
+
+ AWAIT_READY(status1);
+ EXPECT_EQ(TASK_RUNNING, status1->state());
+
+ // Collect metrics based on framework1.
+ JSON::Object stats1 = Metrics();
+ ASSERT_EQ(1u, stats1.values.count("master/cpus_used"));
+ ASSERT_EQ(1u, stats1.values.count("master/mem_used"));
+ ASSERT_EQ(1u, stats1.values.count("master/disk_used"));
+ ASSERT_EQ(1u, stats1.values.count("master/disk_revocable_used"));
+ EXPECT_EQ(1, stats1.values["master/cpus_used"]);
+ EXPECT_EQ(128, stats1.values["master/mem_used"]);
+ EXPECT_EQ(2048, stats1.values["master/disk_used"]);
+ EXPECT_EQ(0, stats1.values["master/disk_revocable_used"]);
+ ASSERT_EQ(1u, stats1.values.count("slave/cpus_used"));
+ ASSERT_EQ(1u, stats1.values.count("slave/mem_used"));
+ ASSERT_EQ(1u, stats1.values.count("slave/disk_used"));
+ ASSERT_EQ(1u, stats1.values.count("slave/disk_revocable_used"));
+ EXPECT_EQ(2048, stats1.values["slave/disk_used"]);
+ EXPECT_EQ(0, stats1.values["slave/disk_revocable_used"]);
+
+ // 3. Create framework2 of the same role. It would be offered resources
+ // recovered from the framework1 call.
+ FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo2.set_role(DEFAULT_TEST_ROLE);
+ frameworkInfo2.add_capabilities()->set_type(
+ FrameworkInfo::Capability::SHARED_RESOURCES);
+
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(
+ &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched2, registered(&driver2, _, _));
+
+ Future<vector<Offer>> offers2;
+ EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+ .WillOnce(FutureArg<1>(&offers2))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver2.start();
+
+ AWAIT_READY(offers2);
+ EXPECT_FALSE(offers2->empty());
+
+ Offer offer2 = offers2.get()[0];
+
+ EXPECT_TRUE(Resources(offer2.resources()).contains(
+ allocatedResources(volume, frameworkInfo2.role())));
+
+ // 4. framework2 LAUNCHes a task with a subset of resources from the offer.
+
+ // Create a task `task2` which uses the same shared volume as `task1`.
+ TaskInfo task2 = createTask(
+ offer2.slave_id(),
+ Resources::parse("cpus:1;mem:256").get() + volume,
+ "echo abc > path1/file2 && sleep 1000");
+
+ // We should receive a TASK_RUNNING for the launched task.
+ Future<TaskStatus> status2;
+
+ EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+ .WillOnce(FutureArg<1>(&status2));
+
+ driver2.acceptOffers(
+ {offer2.id()},
+ {LAUNCH({task2})},
+ filters);
+
+ AWAIT_READY(status2);
+ EXPECT_EQ(TASK_RUNNING, status2->state());
+
+ // Collect metrics based on both frameworks. Note that the `cpus_used` and
+ // `mem_used` is updated, but `disk_used` does not change since both tasks
+ // use the same shared volume.
+ JSON::Object stats2 = Metrics();
+ ASSERT_EQ(1u, stats2.values.count("master/cpus_used"));
+ ASSERT_EQ(1u, stats2.values.count("master/mem_used"));
+ ASSERT_EQ(1u, stats2.values.count("master/disk_used"));
+ ASSERT_EQ(1u, stats2.values.count("master/disk_revocable_used"));
+ EXPECT_EQ(2, stats2.values["master/cpus_used"]);
+ EXPECT_EQ(384, stats2.values["master/mem_used"]);
+ EXPECT_EQ(2048, stats2.values["master/disk_used"]);
+ EXPECT_EQ(0, stats2.values["master/disk_revocable_used"]);
+ ASSERT_EQ(1u, stats2.values.count("slave/cpus_used"));
+ ASSERT_EQ(1u, stats2.values.count("slave/mem_used"));
+ ASSERT_EQ(1u, stats2.values.count("slave/disk_used"));
+ ASSERT_EQ(1u, stats2.values.count("slave/disk_revocable_used"));
+ EXPECT_EQ(2048, stats2.values["slave/disk_used"]);
+ EXPECT_EQ(0, stats2.values["slave/disk_revocable_used"]);
+
+ // Resume the clock so the terminating task and executor can be reaped.
+ Clock::resume();
+
+ driver1.stop();
+ driver1.join();
+
+ driver2.stop();
+ driver2.join();
+}
+
+
+// This test verifies that multiple frameworks belonging to the same role
+// can use the same shared persistent volume to launch tasks simultaneously.
// It also verifies that metrics for used resources are populated on the
// master and the agent.
TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)