You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/05/29 03:02:45 UTC
[1/3] mesos git commit: Updated slave to send total amount of
oversubscribed resources.
Repository: mesos
Updated Branches:
refs/heads/master 5c9529777 -> fbf5c7e70
Updated slave to send total amount of oversubscribed resources.
Review: https://reviews.apache.org/r/34729
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0df7bb09
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0df7bb09
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0df7bb09
Branch: refs/heads/master
Commit: 0df7bb09894235cac0dbf1dfdb0a23d2799d62e9
Parents: 5c95297
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed May 20 19:10:52 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu May 28 17:11:01 2015 -0700
----------------------------------------------------------------------
src/messages/messages.proto | 7 +++--
src/slave/flags.cpp | 8 +++---
src/slave/flags.hpp | 2 +-
src/slave/slave.cpp | 46 +++++++++++++++++++++----------
src/slave/slave.hpp | 8 ++++--
src/tests/oversubscription_tests.cpp | 14 +++++-----
6 files changed, 53 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 39dac72..1c8d79e 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -334,10 +334,11 @@ message CheckpointResourcesMessage {
// This message is sent by the slave to the master to inform the
-// master about the currently oversubscribable resources.
-message OversubscribeResourcesMessage {
+// master about the total amount of oversubscribed (allocated and
+// allocatable) resources.
+message UpdateSlaveMessage {
required SlaveID slave_id = 1;
- repeated Resource resources = 2;
+ repeated Resource oversubscribed_resources = 2;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index a8c7c49..6b7c61e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -467,10 +467,10 @@ mesos::internal::slave::Flags::Flags()
"resource_estimator",
"The name of the resource estimator to use for oversubscription.");
- add(&Flags::oversubscribe_resources_interval,
- "oversubscribe_resources_interval",
+ add(&Flags::oversubscribed_resources_interval,
+ "oversubscribed_resources_interval",
"The slave periodically updates the master with the current estimation\n"
- "about the maximum amount of resources that can be oversubscribed. The\n"
- "interval between updates is controlled by this flag.",
+ "about the total amount of oversubscribed resources that are allocated\n"
+ "and available. The interval between updates is controlled by this flag.",
Seconds(15));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 6ca59dc..944ed79 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -101,7 +101,7 @@ public:
std::string authenticatee;
Option<std::string> hooks;
Option<std::string> resource_estimator;
- Duration oversubscribe_resources_interval;
+ Duration oversubscribed_resources_interval;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b4d2029..fdaaea4 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3982,7 +3982,7 @@ void Slave::__recover(const Future<Nothing>& future)
// forward the estimations to the master.
resourceEstimator->oversubscribable()
.onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1))
- .onAny(defer(self(), &Self::forwardOversubscribableResources));
+ .onAny(defer(self(), &Self::forwardOversubscribedResources));
// Start detecting masters.
detection = detector->detect()
@@ -4090,34 +4090,50 @@ void Slave::updateOversubscribableResources(const Future<Resources>& future)
}
-void Slave::forwardOversubscribableResources()
+void Slave::forwardOversubscribedResources()
{
if (state != RUNNING) {
- delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
+ delay(Seconds(1), self(), &Self::forwardOversubscribedResources);
return;
}
- // We only forward updates after the first estimation is received.
- if (oversubscribableResources.isNone()) {
- delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
- return;
+ // Calculate the latest allocation of oversubscribed resources.
+ // Note that this allocation value might be different from the
+ // master's view because new task/executor might be in flight from
+ // the master or pending on the slave etc. This is ok because the
+ // allocator only considers the slave's view of allocation when
+ // calculating the available oversubscribed resources to offer.
+ Resources oversubscribed;
+ foreachvalue (Framework* framework, frameworks) {
+ foreachvalue (Executor* executor, framework->executors) {
+ oversubscribed += executor->resources.revocable();
+ }
}
- CHECK_SOME(master);
- CHECK_SOME(oversubscribableResources);
+ // Add oversubscribable resources to the total.
+ oversubscribed += oversubscribableResources;
- LOG(INFO) << "Forwarding oversubscribable resources "
- << oversubscribableResources.get();
+ if (oversubscribed == oversubscribedResources) {
+ VLOG(1) << "Not forwarding total oversubscribed resources because the"
+ << " previous estimate " << oversubscribed << " hasn't changed";
+ return;
+ }
+
+ LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed;
- OversubscribeResourcesMessage message;
+ UpdateSlaveMessage message;
message.mutable_slave_id()->CopyFrom(info.id());
- message.mutable_resources()->CopyFrom(oversubscribableResources.get());
+ message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
+ CHECK_SOME(master);
send(master.get(), message);
- delay(flags.oversubscribe_resources_interval,
+ delay(flags.oversubscribed_resources_interval,
self(),
- &Self::forwardOversubscribableResources);
+ &Self::forwardOversubscribedResources);
+
+ // Update the estimate.
+ oversubscribedResources = oversubscribed;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0207eaf..245ea06 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -434,7 +434,7 @@ private:
const Executor* executor);
void updateOversubscribableResources(const Future<Resources>& future);
- void forwardOversubscribableResources();
+ void forwardOversubscribedResources();
const Flags flags;
@@ -510,7 +510,11 @@ private:
// The most recent estimation about the maximum amount of resources
// that can be oversubscribed on the slave.
- Option<Resources> oversubscribableResources;
+ Resources oversubscribableResources;
+
+ // The total amount of oversubscribed (allocated and
+ // oversubscribable) resources.
+ Resources oversubscribedResources;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 75c25b0..36a6793 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -48,8 +48,8 @@ class OversubscriptionSlaveTest : public MesosTest {};
// This test verifies that slave will forward the estimation of the
-// oversubscribable resources to the master.
-TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
+// oversubscribed resources to the master.
+TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage)
{
Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -66,13 +66,13 @@ TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
AWAIT_READY(slaveRegistered);
- Future<OversubscribeResourcesMessage> update =
- FUTURE_PROTOBUF(OversubscribeResourcesMessage(), _, _);
+ Future<UpdateSlaveMessage> update =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Clock::pause();
Clock::settle();
- Clock::advance(flags.oversubscribe_resources_interval);
+ Clock::advance(flags.oversubscribed_resources_interval);
ASSERT_FALSE(update.isReady());
@@ -81,10 +81,10 @@ TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
resourceEstimator.estimate(resources);
Clock::settle();
- Clock::advance(flags.oversubscribe_resources_interval);
+ Clock::advance(flags.oversubscribed_resources_interval);
AWAIT_READY(update);
- EXPECT_EQ(Resources(update.get().resources()), resources);
+ EXPECT_EQ(Resources(update.get().oversubscribed_resources()), resources);
Shutdown();
}
[2/3] mesos git commit: Added 'updateSlave()' in master to handle
oversubscribed resources.
Posted by vi...@apache.org.
Added 'updateSlave()' in master to handle oversubscribed resources.
Review: https://reviews.apache.org/r/34730
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/949e6ad1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/949e6ad1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/949e6ad1
Branch: refs/heads/master
Commit: 949e6ad1c6e24e3446c44519af28dd5f32e3c486
Parents: 0df7bb0
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed May 20 19:11:44 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu May 28 17:11:04 2015 -0700
----------------------------------------------------------------------
src/master/master.cpp | 19 +++++++++++++++++++
src/master/master.hpp | 4 ++++
src/master/metrics.cpp | 4 ++++
src/master/metrics.hpp | 1 +
src/tests/oversubscription_tests.cpp | 20 ++++++++++++++------
5 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1526f59..d61b77b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -718,6 +718,11 @@ void Master::initialize()
&ExitedExecutorMessage::executor_id,
&ExitedExecutorMessage::status);
+ install<UpdateSlaveMessage>(
+ &Master::updateSlave,
+ &UpdateSlaveMessage::slave_id,
+ &UpdateSlaveMessage::oversubscribed_resources);
+
install<AuthenticateMessage>(
&Master::authenticate,
&AuthenticateMessage::pid);
@@ -3452,6 +3457,20 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
}
+void Master::updateSlave(
+ const SlaveID& slaveId,
+ const vector<Resource>& oversubscribedResources)
+{
+ ++metrics->messages_update_slave;
+
+ LOG(INFO) << "Received update of slave " << slaveId
+ << " with oversubscribed resources " << oversubscribedResources;
+
+ // TODO(vinod): Rescind any oustanding revocable offers from this
+ // slave and update the allocator.
+}
+
+
// TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid'
// because the status updates will be sent by the slave.
void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c8c6251..c0cc293 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -750,6 +750,10 @@ public:
const ExecutorID& executorId,
int32_t status);
+ void updateSlave(
+ const SlaveID& slaveId,
+ const std::vector<Resource>& oversubscribedResources);
+
void shutdownSlave(
const SlaveID& slaveId,
const std::string& message);
http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index ee09664..264252c 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -117,6 +117,8 @@ Metrics::Metrics(const Master& master)
"master/messages_status_update"),
messages_exited_executor(
"master/messages_exited_executor"),
+ messages_update_slave(
+ "master/messages_update_slave"),
messages_authenticate(
"master/messages_authenticate"),
valid_framework_to_executor_messages(
@@ -208,6 +210,7 @@ Metrics::Metrics(const Master& master)
process::metrics::add(messages_unregister_slave);
process::metrics::add(messages_status_update);
process::metrics::add(messages_exited_executor);
+ process::metrics::add(messages_update_slave);
// Messages from both schedulers and slaves.
process::metrics::add(messages_authenticate);
@@ -314,6 +317,7 @@ Metrics::~Metrics()
process::metrics::remove(messages_unregister_slave);
process::metrics::remove(messages_status_update);
process::metrics::remove(messages_exited_executor);
+ process::metrics::remove(messages_update_slave);
// Messages from both schedulers and slaves.
process::metrics::remove(messages_authenticate);
http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index 78d0666..833033c 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -138,6 +138,7 @@ struct Metrics
process::metrics::Counter messages_unregister_slave;
process::metrics::Counter messages_status_update;
process::metrics::Counter messages_exited_executor;
+ process::metrics::Counter messages_update_slave;
// Messages from both schedulers and slaves.
process::metrics::Counter messages_authenticate;
http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 36a6793..1dda63e 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -33,6 +33,7 @@
#include "slave/slave.hpp"
#include "tests/mesos.hpp"
+#include "tests/utils.hpp"
using namespace process;
@@ -44,12 +45,12 @@ namespace mesos {
namespace internal {
namespace tests {
-class OversubscriptionSlaveTest : public MesosTest {};
+class OversubscriptionTest : public MesosTest {};
// This test verifies that slave will forward the estimation of the
// oversubscribed resources to the master.
-TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage)
+TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
{
Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -71,8 +72,9 @@ TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage)
Clock::pause();
- Clock::settle();
+ // No update should be sent until there is an estimate.
Clock::advance(flags.oversubscribed_resources_interval);
+ Clock::settle();
ASSERT_FALSE(update.isReady());
@@ -80,12 +82,18 @@ TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage)
Resources resources = Resources::parse("cpus:1;mem:32").get();
resourceEstimator.estimate(resources);
- Clock::settle();
- Clock::advance(flags.oversubscribed_resources_interval);
-
AWAIT_READY(update);
EXPECT_EQ(Resources(update.get().oversubscribed_resources()), resources);
+ // Ensure the metric is updated.
+ JSON::Object metrics = Metrics();
+ ASSERT_EQ(
+ 1u,
+ metrics.values.count("master/messages_update_slave"));
+ ASSERT_EQ(
+ 1u,
+ metrics.values["master/messages_update_slave"]);
+
Shutdown();
}
[3/3] mesos git commit: Implemented 'updateSlave()' call in the
master.
Posted by vi...@apache.org.
Implemented 'updateSlave()' call in the master.
Review: https://reviews.apache.org/r/34736
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fbf5c7e7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fbf5c7e7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fbf5c7e7
Branch: refs/heads/master
Commit: fbf5c7e703c691f8b8bcf20ea7c324e9987beab1
Parents: 949e6ad
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed May 27 16:07:59 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu May 28 17:11:04 2015 -0700
----------------------------------------------------------------------
src/master/master.cpp | 53 ++++++++-
src/tests/oversubscription_tests.cpp | 172 +++++++++++++++++++++++++++++-
2 files changed, 219 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d61b77b..710b814 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3463,11 +3463,52 @@ void Master::updateSlave(
{
++metrics->messages_update_slave;
- LOG(INFO) << "Received update of slave " << slaveId
- << " with oversubscribed resources " << oversubscribedResources;
+ if (slaves.removed.get(slaveId).isSome()) {
+ // If the slave is removed, we have already informed
+ // frameworks that its tasks were LOST, so the slave should
+ // shut down.
+ LOG(WARNING)
+ << "Ignoring update of slave with total oversubscribed resources "
+ << oversubscribedResources << " on removed slave " << slaveId
+ << " ; asking slave to shutdown";
+
+ ShutdownMessage message;
+ message.set_message("Update slave message from unknown slave");
+ reply(message);
+ return;
+ }
+
+ if (!slaves.registered.contains(slaveId)) {
+ LOG(WARNING)
+ << "Ignoring update of slave with total oversubscribed resources "
+ << oversubscribedResources << " on unknown slave " << slaveId;
+ return;
+ }
+
+ Slave* slave = CHECK_NOTNULL(slaves.registered.get(slaveId));
+
+ LOG(INFO) << "Received update of slave " << *slave << " with total"
+ << " oversubscribed resources " << oversubscribedResources;
+
+ // First, rescind any oustanding offers with revocable resources.
+ // NOTE: Need a copy of offers because the offers are removed inside
+ // the loop.
+ foreach (Offer* offer, utils::copy(slave->offers)) {
+ const Resources offered = offer->resources();
+ if (!offered.revocable().empty()) {
+ LOG(INFO) << "Removing offer " << offer->id()
+ << " with revocable resources " << offered
+ << " on slave " << *slave;
+
+ allocator->recoverResources(
+ offer->framework_id(), offer->slave_id(), offer->resources(), None());
+
+ removeOffer(offer, true); // Rescind.
+ }
+ }
- // TODO(vinod): Rescind any oustanding revocable offers from this
- // slave and update the allocator.
+ // Now, update the allocator with the new estimate.
+ allocator->updateSlave(slaveId, oversubscribedResources);
}
@@ -3984,6 +4025,10 @@ void Master::offer(const FrameworkID& frameworkId,
}
#endif // WITH_NETWORK_ISOLATOR
+ // TODO(vinod): Split regular and revocable resources into
+ // separate offers, so that rescinding offers with revocable
+ // resources does not affect offers with regular resources.
+
Offer* offer = new Offer();
offer->mutable_id()->MergeFrom(newOfferId());
offer->mutable_framework_id()->MergeFrom(framework->id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 1dda63e..ea5857c 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -16,15 +16,21 @@
* limitations under the License.
*/
+#include <string>
+#include <vector>
+
#include <gmock/gmock.h>
#include <mesos/resources.hpp>
#include <process/clock.hpp>
+#include <process/future.hpp>
#include <process/gtest.hpp>
#include <stout/gtest.hpp>
+#include "common/resources_utils.hpp"
+
#include "master/master.hpp"
#include "messages/messages.hpp"
@@ -41,11 +47,28 @@ using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
+using std::string;
+using std::vector;
+
namespace mesos {
namespace internal {
namespace tests {
-class OversubscriptionTest : public MesosTest {};
+class OversubscriptionTest : public MesosTest
+{
+protected:
+ // TODO(vinod): Make this a global helper that other tests (e.g.,
+ // hierarchical allocator tests) can use.
+ Resources createRevocableResources(
+ const string& name,
+ const string& value,
+ const string& role = "*")
+ {
+ Resource resource = Resources::parse(name, value, role).get();
+ resource.mutable_revocable();
+ return resource;
+ }
+};
// This test verifies that slave will forward the estimation of the
@@ -79,7 +102,7 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
ASSERT_FALSE(update.isReady());
// Inject an estimation of oversubscribable resources.
- Resources resources = Resources::parse("cpus:1;mem:32").get();
+ Resources resources = createRevocableResources("cpus", "1");
resourceEstimator.estimate(resources);
AWAIT_READY(update);
@@ -97,6 +120,151 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
Shutdown();
}
+
+// This test verifies that a framework that desires revocable
+// resources gets an offer with revocable resources.
+TEST_F(OversubscriptionTest, RevocableOffer)
+{
+ // Start the master.
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Start the slave with test resource estimator.
+ TestResourceEstimator resourceEstimator;
+ slave::Flags flags = CreateSlaveFlags();
+
+ Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
+ ASSERT_SOME(slave);
+
+ // Start the framework which desires revocable resources.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.add_capabilities()->set_type(
+ FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers1;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers1));
+
+ driver.start();
+
+ // Initially the framework will get all regular resources.
+ AWAIT_READY(offers1);
+ EXPECT_NE(0u, offers1.get().size());
+ EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+
+ Future<vector<Offer>> offers2;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers2))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ // Inject an estimation of oversubscribable resources.
+ Resources resources = createRevocableResources("cpus", "1");
+ resourceEstimator.estimate(resources);
+
+ // Now the framework will get revocable resources.
+ AWAIT_READY(offers2);
+ EXPECT_NE(0u, offers2.get().size());
+ EXPECT_EQ(resources, Resources(offers2.get()[0].resources()));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that when the master receives a new estimate for
+// oversubscribed resources it rescinds outstanding revocable offers.
+TEST_F(OversubscriptionTest, RescindRevocableOffer)
+{
+ // Start the master.
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Start the slave with test resource estimator.
+ TestResourceEstimator resourceEstimator;
+ slave::Flags flags = CreateSlaveFlags();
+
+ Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
+ ASSERT_SOME(slave);
+
+ // Start the framework which desires revocable resources.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.add_capabilities()->set_type(
+ FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers1;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers1));
+
+ driver.start();
+
+ // Initially the framework will get all regular resources.
+ AWAIT_READY(offers1);
+ EXPECT_NE(0u, offers1.get().size());
+ EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+
+ Future<vector<Offer>> offers2;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers2));
+
+ // Inject an estimation of oversubscribable resources.
+ Resources resources = createRevocableResources("cpus", "1");
+ resourceEstimator.estimate(resources);
+
+ // Now the framework will get revocable resources.
+ AWAIT_READY(offers2);
+ EXPECT_NE(0u, offers2.get().size());
+ EXPECT_EQ(resources, Resources(offers2.get()[0].resources()));
+
+ Future<OfferID> offerId;
+ EXPECT_CALL(sched, offerRescinded(&driver, _))
+ .WillOnce(FutureArg<1>(&offerId));
+
+ Future<vector<Offer>> offers3;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers3))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ // Inject another estimation of oversubscribable resources while the
+ // previous revocable offer is oustanding.
+ Resources resources2 = createRevocableResources("cpus", "2");
+ resourceEstimator.estimate(resources2);
+
+ // Advance the clock for the slave to send the new estimate.
+ Clock::pause();
+ Clock::advance(flags.oversubscribed_resources_interval);
+ Clock::settle();
+
+ // The previous revocable offer should be rescinded.
+ AWAIT_EXPECT_EQ(offers2.get()[0].id(), offerId);
+
+ // Resume the clock for next allocation.
+ Clock::resume();
+
+ // The new offer should include the latest oversubscribed resources.
+ AWAIT_READY(offers3);
+ EXPECT_NE(0u, offers3.get().size());
+ EXPECT_EQ(resources2, Resources(offers3.get()[0].resources()));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {