You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/05/03 09:03:14 UTC

[1/2] mesos git commit: Metrics for used resources should incorporate shared resources.

Repository: mesos
Updated Branches:
  refs/heads/master 5a299634e -> b5f595f7e


Metrics for used resources should incorporate shared resources.

The following metrics now take shared resources into account (i.e.,
the quantity of a share resource is not affected by its shared count)
in determination of the actual amount of used resources:
a) `master/<resource-name>_used`
b) `master/<resource-name>_revocable_used`
c) `slave/<resource-name>_used`
d) `slave/<resource-name>_revocable_used`

Review: https://reviews.apache.org/r/57963/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/25e3b141
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/25e3b141
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/25e3b141

Branch: refs/heads/master
Commit: 25e3b1412b51b0991da93b93b853e4f48cf73160
Parents: 5a29963
Author: Anindya Sinha <an...@apple.com>
Authored: Wed May 3 01:45:43 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed May 3 02:00:18 2017 -0700

----------------------------------------------------------------------
 src/master/master.cpp                 |  26 +++--
 src/slave/slave.cpp                   |  24 ++--
 src/tests/persistent_volume_tests.cpp | 180 +++++++++++++++++++++++++++++
 3 files changed, 206 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/25e3b141/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 31a7a2f..87c647b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -9079,13 +9079,16 @@ double Master::_resources_used(const string& name)
   double used = 0.0;
 
   foreachvalue (Slave* slave, slaves.registered) {
+    // We use `Resources` arithmetic to accummulate the resources since the
+    // `+=` operator de-duplicates the same shared resources across frameworks.
+    Resources slaveUsed;
+
     foreachvalue (const Resources& resources, slave->usedResources) {
-      foreach (const Resource& resource, resources.nonRevocable()) {
-        if (resource.name() == name && resource.type() == Value::SCALAR) {
-          used += resource.scalar().value();
-        }
-      }
+      slaveUsed += resources.nonRevocable();
     }
+
+    used +=
+      slaveUsed.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
   }
 
   return used;
@@ -9125,13 +9128,16 @@ double Master::_resources_revocable_used(const string& name)
   double used = 0.0;
 
   foreachvalue (Slave* slave, slaves.registered) {
+    // We use `Resources` arithmetic to accummulate the resources since the
+    // `+=` operator de-duplicates the same shared resources across frameworks.
+    Resources slaveUsed;
+
     foreachvalue (const Resources& resources, slave->usedResources) {
-      foreach (const Resource& resource, resources.revocable()) {
-        if (resource.name() == name && resource.type() == Value::SCALAR) {
-          used += resource.scalar().value();
-        }
-      }
+      slaveUsed += resources.revocable();
     }
+
+    used +=
+      slaveUsed.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
   }
 
   return used;

http://git-wip-us.apache.org/repos/asf/mesos/blob/25e3b141/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8b8078d..284f872 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -6740,19 +6740,17 @@ double Slave::_resources_total(const string& name)
 
 double Slave::_resources_used(const string& name)
 {
-  double used = 0.0;
+  // We use `Resources` arithmetic to accummulate the resources since the
+  // `+=` operator de-duplicates the same shared resources across executors.
+  Resources used;
 
   foreachvalue (Framework* framework, frameworks) {
     foreachvalue (Executor* executor, framework->executors) {
-      foreach (const Resource& resource, executor->resources.nonRevocable()) {
-        if (resource.name() == name && resource.type() == Value::SCALAR) {
-          used += resource.scalar().value();
-        }
-      }
+      used += executor->resources.nonRevocable();
     }
   }
 
-  return used;
+  return used.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
 }
 
 
@@ -6786,19 +6784,17 @@ double Slave::_resources_revocable_total(const string& name)
 
 double Slave::_resources_revocable_used(const string& name)
 {
-  double used = 0.0;
+  // We use `Resources` arithmetic to accummulate the resources since the
+  // `+=` operator de-duplicates the same shared resources across executors.
+  Resources used;
 
   foreachvalue (Framework* framework, frameworks) {
     foreachvalue (Executor* executor, framework->executors) {
-      foreach (const Resource& resource, executor->resources.revocable()) {
-        if (resource.name() == name && resource.type() == Value::SCALAR) {
-          used += resource.scalar().value();
-        }
-      }
+      used += executor->resources.revocable();
     }
   }
 
-  return used;
+  return used.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/25e3b141/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index d42fd18..d6a48af 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -1187,6 +1187,186 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeRescindOnDestroy)
 }
 
 
+// This test verifies that multiple frameworks belonging to the same role
+// can use the same shared persistent volume to launch tasks simultaneously.
+// It also verifies that metrics for used resources are populated on the
+// master and the agent.
+TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // 1. Create framework1 so that all resources are offered to this framework.
+  FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo1.set_role(DEFAULT_TEST_ROLE);
+  frameworkInfo1.add_capabilities()->set_type(
+      FrameworkInfo::Capability::SHARED_RESOURCES);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched1, registered(&driver1, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver1.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers1);
+  EXPECT_FALSE(offers1->empty());
+
+  Offer offer1 = offers1.get()[0];
+
+  // 2. framework1 CREATEs a shared volume, and LAUNCHes a task with a subset
+  //    of resources from the offer.
+  Resource volume = createPersistentVolume(
+      getDiskResource(Megabytes(2048)),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo1.principal(),
+      true); // Shared volume.
+
+  // Create a task which uses a portion of the offered resources, so that
+  // the remaining resources can be offered to framework2.
+  TaskInfo task1 = createTask(
+      offer1.slave_id(),
+      Resources::parse("cpus:1;mem:128").get() + volume,
+      "echo abc > path1/file1 && sleep 1000");
+
+  // We should receive a TASK_RUNNING for the launched task.
+  Future<TaskStatus> status1;
+
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&status1));
+
+  // We use a filter of 0 seconds so the resources will be available
+  // in the next allocation cycle.
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  driver1.acceptOffers(
+      {offer1.id()},
+      {CREATE(volume),
+       LAUNCH({task1})},
+      filters);
+
+  AWAIT_READY(status1);
+  EXPECT_EQ(TASK_RUNNING, status1->state());
+
+  // Collect metrics based on framework1.
+  JSON::Object stats1 = Metrics();
+  ASSERT_EQ(1u, stats1.values.count("master/cpus_used"));
+  ASSERT_EQ(1u, stats1.values.count("master/mem_used"));
+  ASSERT_EQ(1u, stats1.values.count("master/disk_used"));
+  ASSERT_EQ(1u, stats1.values.count("master/disk_revocable_used"));
+  EXPECT_EQ(1, stats1.values["master/cpus_used"]);
+  EXPECT_EQ(128, stats1.values["master/mem_used"]);
+  EXPECT_EQ(2048, stats1.values["master/disk_used"]);
+  EXPECT_EQ(0, stats1.values["master/disk_revocable_used"]);
+  ASSERT_EQ(1u, stats1.values.count("slave/cpus_used"));
+  ASSERT_EQ(1u, stats1.values.count("slave/mem_used"));
+  ASSERT_EQ(1u, stats1.values.count("slave/disk_used"));
+  ASSERT_EQ(1u, stats1.values.count("slave/disk_revocable_used"));
+  EXPECT_EQ(2048, stats1.values["slave/disk_used"]);
+  EXPECT_EQ(0, stats1.values["slave/disk_revocable_used"]);
+
+  // 3. Create framework2 of the same role. It would be offered resources
+  //    recovered from the framework1 call.
+  FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo2.set_role(DEFAULT_TEST_ROLE);
+  frameworkInfo2.add_capabilities()->set_type(
+      FrameworkInfo::Capability::SHARED_RESOURCES);
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched2, registered(&driver2, _, _));
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver2.start();
+
+  AWAIT_READY(offers2);
+  EXPECT_FALSE(offers2->empty());
+
+  Offer offer2 = offers2.get()[0];
+
+  EXPECT_TRUE(Resources(offer2.resources()).contains(
+      allocatedResources(volume, frameworkInfo2.role())));
+
+  // 4. framework2 LAUNCHes a task with a subset of resources from the offer.
+
+  // Create a task `task2` which uses the same shared volume as `task1`.
+  TaskInfo task2 = createTask(
+      offer2.slave_id(),
+      Resources::parse("cpus:1;mem:256").get() + volume,
+      "echo abc > path1/file2 && sleep 1000");
+
+  // We should receive a TASK_RUNNING for the launched task.
+  Future<TaskStatus> status2;
+
+  EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  driver2.acceptOffers(
+      {offer2.id()},
+      {LAUNCH({task2})},
+      filters);
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_RUNNING, status2->state());
+
+  // Collect metrics based on both frameworks. Note that the `cpus_used` and
+  // `mem_used` is updated, but `disk_used` does not change since both tasks
+  // use the same shared volume.
+  JSON::Object stats2 = Metrics();
+  ASSERT_EQ(1u, stats2.values.count("master/cpus_used"));
+  ASSERT_EQ(1u, stats2.values.count("master/mem_used"));
+  ASSERT_EQ(1u, stats2.values.count("master/disk_used"));
+  ASSERT_EQ(1u, stats2.values.count("master/disk_revocable_used"));
+  EXPECT_EQ(2, stats2.values["master/cpus_used"]);
+  EXPECT_EQ(384, stats2.values["master/mem_used"]);
+  EXPECT_EQ(2048, stats2.values["master/disk_used"]);
+  EXPECT_EQ(0, stats2.values["master/disk_revocable_used"]);
+  ASSERT_EQ(1u, stats2.values.count("slave/cpus_used"));
+  ASSERT_EQ(1u, stats2.values.count("slave/mem_used"));
+  ASSERT_EQ(1u, stats2.values.count("slave/disk_used"));
+  ASSERT_EQ(1u, stats2.values.count("slave/disk_revocable_used"));
+  EXPECT_EQ(2048, stats2.values["slave/disk_used"]);
+  EXPECT_EQ(0, stats2.values["slave/disk_revocable_used"]);
+
+  // Resume the clock so the terminating task and executor can be reaped.
+  Clock::resume();
+
+  driver1.stop();
+  driver1.join();
+
+  driver2.stop();
+  driver2.join();
+}
+
+
 // This test verifies that the master recovers after a failover and
 // re-offers the shared persistent volume when tasks using the same
 // volume are still running.


[2/2] mesos git commit: Added a test to verify metrics when shared resources are present.

Posted by ya...@apache.org.
Added a test to verify metrics when shared resources are present.

Review: https://reviews.apache.org/r/57964/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b5f595f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b5f595f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b5f595f7

Branch: refs/heads/master
Commit: b5f595f7e7f0941fd0ddc5edb37b2c06062de253
Parents: 25e3b14
Author: Anindya Sinha <an...@apple.com>
Authored: Wed May 3 01:52:33 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed May 3 02:02:05 2017 -0700

----------------------------------------------------------------------
 src/tests/persistent_volume_tests.cpp | 180 +++++++++++++++++++++++++++++
 1 file changed, 180 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b5f595f7/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index d6a48af..984f1b7 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -1189,6 +1189,186 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeRescindOnDestroy)
 
 // This test verifies that multiple frameworks belonging to the same role
 // can use the same shared persistent volume to launch tasks simultaneously.
+// It also verifies that metrics for used resources are correctly populated
+// on the master and the agent.
+TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // 1. Create framework1 so that all resources are offered to this framework.
+  FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo1.set_role(DEFAULT_TEST_ROLE);
+  frameworkInfo1.add_capabilities()->set_type(
+      FrameworkInfo::Capability::SHARED_RESOURCES);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched1, registered(&driver1, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver1.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers1);
+  EXPECT_FALSE(offers1->empty());
+
+  Offer offer1 = offers1.get()[0];
+
+  // 2. framework1 CREATEs a shared volume, and LAUNCHes a task with a subset
+  //    of resources from the offer.
+  Resource volume = createPersistentVolume(
+      getDiskResource(Megabytes(2048)),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo1.principal(),
+      true); // Shared volume.
+
+  // Create a task which uses a portion of the offered resources, so that
+  // the remaining resources can be offered to framework2.
+  TaskInfo task1 = createTask(
+      offer1.slave_id(),
+      Resources::parse("cpus:1;mem:128").get() + volume,
+      "echo abc > path1/file1 && sleep 1000");
+
+  // We should receive a TASK_RUNNING for the launched task.
+  Future<TaskStatus> status1;
+
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&status1));
+
+  // We use a filter of 0 seconds so the resources will be available
+  // in the next allocation cycle.
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  driver1.acceptOffers(
+      {offer1.id()},
+      {CREATE(volume),
+       LAUNCH({task1})},
+      filters);
+
+  AWAIT_READY(status1);
+  EXPECT_EQ(TASK_RUNNING, status1->state());
+
+  // Collect metrics based on framework1.
+  JSON::Object stats1 = Metrics();
+  ASSERT_EQ(1u, stats1.values.count("master/cpus_used"));
+  ASSERT_EQ(1u, stats1.values.count("master/mem_used"));
+  ASSERT_EQ(1u, stats1.values.count("master/disk_used"));
+  ASSERT_EQ(1u, stats1.values.count("master/disk_revocable_used"));
+  EXPECT_EQ(1, stats1.values["master/cpus_used"]);
+  EXPECT_EQ(128, stats1.values["master/mem_used"]);
+  EXPECT_EQ(2048, stats1.values["master/disk_used"]);
+  EXPECT_EQ(0, stats1.values["master/disk_revocable_used"]);
+  ASSERT_EQ(1u, stats1.values.count("slave/cpus_used"));
+  ASSERT_EQ(1u, stats1.values.count("slave/mem_used"));
+  ASSERT_EQ(1u, stats1.values.count("slave/disk_used"));
+  ASSERT_EQ(1u, stats1.values.count("slave/disk_revocable_used"));
+  EXPECT_EQ(2048, stats1.values["slave/disk_used"]);
+  EXPECT_EQ(0, stats1.values["slave/disk_revocable_used"]);
+
+  // 3. Create framework2 of the same role. It would be offered resources
+  //    recovered from the framework1 call.
+  FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo2.set_role(DEFAULT_TEST_ROLE);
+  frameworkInfo2.add_capabilities()->set_type(
+      FrameworkInfo::Capability::SHARED_RESOURCES);
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched2, registered(&driver2, _, _));
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver2.start();
+
+  AWAIT_READY(offers2);
+  EXPECT_FALSE(offers2->empty());
+
+  Offer offer2 = offers2.get()[0];
+
+  EXPECT_TRUE(Resources(offer2.resources()).contains(
+      allocatedResources(volume, frameworkInfo2.role())));
+
+  // 4. framework2 LAUNCHes a task with a subset of resources from the offer.
+
+  // Create a task `task2` which uses the same shared volume as `task1`.
+  TaskInfo task2 = createTask(
+      offer2.slave_id(),
+      Resources::parse("cpus:1;mem:256").get() + volume,
+      "echo abc > path1/file2 && sleep 1000");
+
+  // We should receive a TASK_RUNNING for the launched task.
+  Future<TaskStatus> status2;
+
+  EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  driver2.acceptOffers(
+      {offer2.id()},
+      {LAUNCH({task2})},
+      filters);
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_RUNNING, status2->state());
+
+  // Collect metrics based on both frameworks. Note that the `cpus_used` and
+  // `mem_used` is updated, but `disk_used` does not change since both tasks
+  // use the same shared volume.
+  JSON::Object stats2 = Metrics();
+  ASSERT_EQ(1u, stats2.values.count("master/cpus_used"));
+  ASSERT_EQ(1u, stats2.values.count("master/mem_used"));
+  ASSERT_EQ(1u, stats2.values.count("master/disk_used"));
+  ASSERT_EQ(1u, stats2.values.count("master/disk_revocable_used"));
+  EXPECT_EQ(2, stats2.values["master/cpus_used"]);
+  EXPECT_EQ(384, stats2.values["master/mem_used"]);
+  EXPECT_EQ(2048, stats2.values["master/disk_used"]);
+  EXPECT_EQ(0, stats2.values["master/disk_revocable_used"]);
+  ASSERT_EQ(1u, stats2.values.count("slave/cpus_used"));
+  ASSERT_EQ(1u, stats2.values.count("slave/mem_used"));
+  ASSERT_EQ(1u, stats2.values.count("slave/disk_used"));
+  ASSERT_EQ(1u, stats2.values.count("slave/disk_revocable_used"));
+  EXPECT_EQ(2048, stats2.values["slave/disk_used"]);
+  EXPECT_EQ(0, stats2.values["slave/disk_revocable_used"]);
+
+  // Resume the clock so the terminating task and executor can be reaped.
+  Clock::resume();
+
+  driver1.stop();
+  driver1.join();
+
+  driver2.stop();
+  driver2.join();
+}
+
+
+// This test verifies that multiple frameworks belonging to the same role
+// can use the same shared persistent volume to launch tasks simultaneously.
 // It also verifies that metrics for used resources are populated on the
 // master and the agent.
 TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)