You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/05/29 03:02:45 UTC

[1/3] mesos git commit: Updated slave to send total amount of oversubscribed resources.

Repository: mesos
Updated Branches:
  refs/heads/master 5c9529777 -> fbf5c7e70


Updated slave to send total amount of oversubscribed resources.

Review: https://reviews.apache.org/r/34729


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0df7bb09
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0df7bb09
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0df7bb09

Branch: refs/heads/master
Commit: 0df7bb09894235cac0dbf1dfdb0a23d2799d62e9
Parents: 5c95297
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed May 20 19:10:52 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu May 28 17:11:01 2015 -0700

----------------------------------------------------------------------
 src/messages/messages.proto          |  7 +++--
 src/slave/flags.cpp                  |  8 +++---
 src/slave/flags.hpp                  |  2 +-
 src/slave/slave.cpp                  | 46 +++++++++++++++++++++----------
 src/slave/slave.hpp                  |  8 ++++--
 src/tests/oversubscription_tests.cpp | 14 +++++-----
 6 files changed, 53 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 39dac72..1c8d79e 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -334,10 +334,11 @@ message CheckpointResourcesMessage {
 
 
 // This message is sent by the slave to the master to inform the
-// master about the currently oversubscribable resources.
-message OversubscribeResourcesMessage {
+// master about the total amount of oversubscribed (allocated and
+// allocatable) resources.
+message UpdateSlaveMessage {
   required SlaveID slave_id = 1;
-  repeated Resource resources = 2;
+  repeated Resource oversubscribed_resources = 2;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index a8c7c49..6b7c61e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -467,10 +467,10 @@ mesos::internal::slave::Flags::Flags()
       "resource_estimator",
       "The name of the resource estimator to use for oversubscription.");
 
-  add(&Flags::oversubscribe_resources_interval,
-      "oversubscribe_resources_interval",
+  add(&Flags::oversubscribed_resources_interval,
+      "oversubscribed_resources_interval",
       "The slave periodically updates the master with the current estimation\n"
-      "about the maximum amount of resources that can be oversubscribed. The\n"
-      "interval between updates is controlled by this flag.",
+      "about the total amount of oversubscribed resources that are allocated\n"
+      "and available. The interval between updates is controlled by this flag.",
       Seconds(15));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 6ca59dc..944ed79 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -101,7 +101,7 @@ public:
   std::string authenticatee;
   Option<std::string> hooks;
   Option<std::string> resource_estimator;
-  Duration oversubscribe_resources_interval;
+  Duration oversubscribed_resources_interval;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b4d2029..fdaaea4 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3982,7 +3982,7 @@ void Slave::__recover(const Future<Nothing>& future)
     // forward the estimations to the master.
     resourceEstimator->oversubscribable()
       .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1))
-      .onAny(defer(self(), &Self::forwardOversubscribableResources));
+      .onAny(defer(self(), &Self::forwardOversubscribedResources));
 
     // Start detecting masters.
     detection = detector->detect()
@@ -4090,34 +4090,50 @@ void Slave::updateOversubscribableResources(const Future<Resources>& future)
 }
 
 
-void Slave::forwardOversubscribableResources()
+void Slave::forwardOversubscribedResources()
 {
   if (state != RUNNING) {
-    delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
+    delay(Seconds(1), self(), &Self::forwardOversubscribedResources);
     return;
   }
 
-  // We only forward updates after the first estimation is received.
-  if (oversubscribableResources.isNone()) {
-    delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
-    return;
+  // Calculate the latest allocation of oversubscribed resources.
+  // Note that this allocation value might be different from the
+  // master's view because new task/executor might be in flight from
+  // the master or pending on the slave etc. This is ok because the
+  // allocator only considers the slave's view of allocation when
+  // calculating the available oversubscribed resources to offer.
+  Resources oversubscribed;
+  foreachvalue (Framework* framework, frameworks) {
+    foreachvalue (Executor* executor, framework->executors) {
+      oversubscribed += executor->resources.revocable();
+    }
   }
 
-  CHECK_SOME(master);
-  CHECK_SOME(oversubscribableResources);
+  // Add oversubscribable resources to the total.
+  oversubscribed += oversubscribableResources;
 
-  LOG(INFO) << "Forwarding oversubscribable resources "
-            << oversubscribableResources.get();
+  if (oversubscribed == oversubscribedResources) {
+    VLOG(1) << "Not forwarding total oversubscribed resources because the"
+            << " previous estimate " << oversubscribed << " hasn't changed";
+    return;
+  }
+
+  LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed;
 
-  OversubscribeResourcesMessage message;
+  UpdateSlaveMessage message;
   message.mutable_slave_id()->CopyFrom(info.id());
-  message.mutable_resources()->CopyFrom(oversubscribableResources.get());
+  message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
 
+  CHECK_SOME(master);
   send(master.get(), message);
 
-  delay(flags.oversubscribe_resources_interval,
+  delay(flags.oversubscribed_resources_interval,
         self(),
-        &Self::forwardOversubscribableResources);
+        &Self::forwardOversubscribedResources);
+
+  // Update the estimate.
+  oversubscribedResources = oversubscribed;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0207eaf..245ea06 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -434,7 +434,7 @@ private:
       const Executor* executor);
 
   void updateOversubscribableResources(const Future<Resources>& future);
-  void forwardOversubscribableResources();
+  void forwardOversubscribedResources();
 
   const Flags flags;
 
@@ -510,7 +510,11 @@ private:
 
   // The most recent estimation about the maximum amount of resources
   // that can be oversubscribed on the slave.
-  Option<Resources> oversubscribableResources;
+  Resources oversubscribableResources;
+
+  // The total amount of oversubscribed (allocated and
+  // oversubscribable) resources.
+  Resources oversubscribedResources;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0df7bb09/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 75c25b0..36a6793 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -48,8 +48,8 @@ class OversubscriptionSlaveTest : public MesosTest {};
 
 
 // This test verifies that slave will forward the estimation of the
-// oversubscribable resources to the master.
-TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
+// oversubscribed resources to the master.
+TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage)
 {
   Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
@@ -66,13 +66,13 @@ TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
 
   AWAIT_READY(slaveRegistered);
 
-  Future<OversubscribeResourcesMessage> update =
-    FUTURE_PROTOBUF(OversubscribeResourcesMessage(), _, _);
+  Future<UpdateSlaveMessage> update =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
 
   Clock::pause();
 
   Clock::settle();
-  Clock::advance(flags.oversubscribe_resources_interval);
+  Clock::advance(flags.oversubscribed_resources_interval);
 
   ASSERT_FALSE(update.isReady());
 
@@ -81,10 +81,10 @@ TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
   resourceEstimator.estimate(resources);
 
   Clock::settle();
-  Clock::advance(flags.oversubscribe_resources_interval);
+  Clock::advance(flags.oversubscribed_resources_interval);
 
   AWAIT_READY(update);
-  EXPECT_EQ(Resources(update.get().resources()), resources);
+  EXPECT_EQ(Resources(update.get().oversubscribed_resources()), resources);
 
   Shutdown();
 }


[2/3] mesos git commit: Added 'updateSlave()' in master to handle oversubscribed resources.

Posted by vi...@apache.org.
Added 'updateSlave()' in master to handle oversubscribed resources.

Review: https://reviews.apache.org/r/34730


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/949e6ad1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/949e6ad1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/949e6ad1

Branch: refs/heads/master
Commit: 949e6ad1c6e24e3446c44519af28dd5f32e3c486
Parents: 0df7bb0
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed May 20 19:11:44 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu May 28 17:11:04 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp                | 19 +++++++++++++++++++
 src/master/master.hpp                |  4 ++++
 src/master/metrics.cpp               |  4 ++++
 src/master/metrics.hpp               |  1 +
 src/tests/oversubscription_tests.cpp | 20 ++++++++++++++------
 5 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1526f59..d61b77b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -718,6 +718,11 @@ void Master::initialize()
       &ExitedExecutorMessage::executor_id,
       &ExitedExecutorMessage::status);
 
+  install<UpdateSlaveMessage>(
+      &Master::updateSlave,
+      &UpdateSlaveMessage::slave_id,
+      &UpdateSlaveMessage::oversubscribed_resources);
+
   install<AuthenticateMessage>(
       &Master::authenticate,
       &AuthenticateMessage::pid);
@@ -3452,6 +3457,20 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
 }
 
 
+void Master::updateSlave(
+    const SlaveID& slaveId,
+    const vector<Resource>& oversubscribedResources)
+{
+  ++metrics->messages_update_slave;
+
+  LOG(INFO) << "Received update of slave " << slaveId
+            << " with oversubscribed resources " <<  oversubscribedResources;
+
+  // TODO(vinod): Rescind any oustanding revocable offers from this
+  // slave and update the allocator.
+}
+
+
 // TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid'
 // because the status updates will be sent by the slave.
 void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)

http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c8c6251..c0cc293 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -750,6 +750,10 @@ public:
       const ExecutorID& executorId,
       int32_t status);
 
+  void updateSlave(
+      const SlaveID& slaveId,
+      const std::vector<Resource>& oversubscribedResources);
+
   void shutdownSlave(
       const SlaveID& slaveId,
       const std::string& message);

http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index ee09664..264252c 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -117,6 +117,8 @@ Metrics::Metrics(const Master& master)
         "master/messages_status_update"),
     messages_exited_executor(
         "master/messages_exited_executor"),
+    messages_update_slave(
+        "master/messages_update_slave"),
     messages_authenticate(
         "master/messages_authenticate"),
     valid_framework_to_executor_messages(
@@ -208,6 +210,7 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(messages_unregister_slave);
   process::metrics::add(messages_status_update);
   process::metrics::add(messages_exited_executor);
+  process::metrics::add(messages_update_slave);
 
   // Messages from both schedulers and slaves.
   process::metrics::add(messages_authenticate);
@@ -314,6 +317,7 @@ Metrics::~Metrics()
   process::metrics::remove(messages_unregister_slave);
   process::metrics::remove(messages_status_update);
   process::metrics::remove(messages_exited_executor);
+  process::metrics::remove(messages_update_slave);
 
   // Messages from both schedulers and slaves.
   process::metrics::remove(messages_authenticate);

http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index 78d0666..833033c 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -138,6 +138,7 @@ struct Metrics
   process::metrics::Counter messages_unregister_slave;
   process::metrics::Counter messages_status_update;
   process::metrics::Counter messages_exited_executor;
+  process::metrics::Counter messages_update_slave;
 
   // Messages from both schedulers and slaves.
   process::metrics::Counter messages_authenticate;

http://git-wip-us.apache.org/repos/asf/mesos/blob/949e6ad1/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 36a6793..1dda63e 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -33,6 +33,7 @@
 #include "slave/slave.hpp"
 
 #include "tests/mesos.hpp"
+#include "tests/utils.hpp"
 
 using namespace process;
 
@@ -44,12 +45,12 @@ namespace mesos {
 namespace internal {
 namespace tests {
 
-class OversubscriptionSlaveTest : public MesosTest {};
+class OversubscriptionTest : public MesosTest {};
 
 
 // This test verifies that slave will forward the estimation of the
 // oversubscribed resources to the master.
-TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage)
+TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
 {
   Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
@@ -71,8 +72,9 @@ TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage)
 
   Clock::pause();
 
-  Clock::settle();
+  // No update should be sent until there is an estimate.
   Clock::advance(flags.oversubscribed_resources_interval);
+  Clock::settle();
 
   ASSERT_FALSE(update.isReady());
 
@@ -80,12 +82,18 @@ TEST_F(OversubscriptionSlaveTest, ForwardUpdateSlaveMessage)
   Resources resources = Resources::parse("cpus:1;mem:32").get();
   resourceEstimator.estimate(resources);
 
-  Clock::settle();
-  Clock::advance(flags.oversubscribed_resources_interval);
-
   AWAIT_READY(update);
   EXPECT_EQ(Resources(update.get().oversubscribed_resources()), resources);
 
+  // Ensure the metric is updated.
+  JSON::Object metrics = Metrics();
+  ASSERT_EQ(
+      1u,
+      metrics.values.count("master/messages_update_slave"));
+  ASSERT_EQ(
+      1u,
+      metrics.values["master/messages_update_slave"]);
+
   Shutdown();
 }
 


[3/3] mesos git commit: Implemented 'updateSlave()' call in the master.

Posted by vi...@apache.org.
Implemented 'updateSlave()' call in the master.

Review: https://reviews.apache.org/r/34736


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fbf5c7e7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fbf5c7e7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fbf5c7e7

Branch: refs/heads/master
Commit: fbf5c7e703c691f8b8bcf20ea7c324e9987beab1
Parents: 949e6ad
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed May 27 16:07:59 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu May 28 17:11:04 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp                |  53 ++++++++-
 src/tests/oversubscription_tests.cpp | 172 +++++++++++++++++++++++++++++-
 2 files changed, 219 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d61b77b..710b814 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3463,11 +3463,52 @@ void Master::updateSlave(
 {
   ++metrics->messages_update_slave;
 
-  LOG(INFO) << "Received update of slave " << slaveId
-            << " with oversubscribed resources " <<  oversubscribedResources;
+  if (slaves.removed.get(slaveId).isSome()) {
+    // If the slave is removed, we have already informed
+    // frameworks that its tasks were LOST, so the slave should
+    // shut down.
+    LOG(WARNING)
+      << "Ignoring update of slave with total oversubscribed resources "
+      << oversubscribedResources << " on removed slave " << slaveId
+      << " ; asking slave to shutdown";
+
+    ShutdownMessage message;
+    message.set_message("Update slave message from unknown slave");
+    reply(message);
+    return;
+  }
+
+  if (!slaves.registered.contains(slaveId)) {
+    LOG(WARNING)
+      << "Ignoring update of slave with total oversubscribed resources "
+      << oversubscribedResources << " on unknown slave " << slaveId;
+    return;
+  }
+
+  Slave* slave = CHECK_NOTNULL(slaves.registered.get(slaveId));
+
+  LOG(INFO) << "Received update of slave " << *slave << " with total"
+            << " oversubscribed resources " <<  oversubscribedResources;
+
+  // First, rescind any oustanding offers with revocable resources.
+  // NOTE: Need a copy of offers because the offers are removed inside
+  // the loop.
+  foreach (Offer* offer, utils::copy(slave->offers)) {
+    const Resources offered = offer->resources();
+    if (!offered.revocable().empty()) {
+      LOG(INFO) << "Removing offer " << offer->id()
+                << " with revocable resources " << offered
+                << " on slave " << *slave;
+
+      allocator->recoverResources(
+          offer->framework_id(), offer->slave_id(), offer->resources(), None());
+
+      removeOffer(offer, true); // Rescind.
+    }
+  }
 
-  // TODO(vinod): Rescind any oustanding revocable offers from this
-  // slave and update the allocator.
+  // Now, update the allocator with the new estimate.
+  allocator->updateSlave(slaveId, oversubscribedResources);
 }
 
 
@@ -3984,6 +4025,10 @@ void Master::offer(const FrameworkID& frameworkId,
     }
 #endif // WITH_NETWORK_ISOLATOR
 
+    // TODO(vinod): Split regular and revocable resources into
+    // separate offers, so that rescinding offers with revocable
+    // resources does not affect offers with regular resources.
+
     Offer* offer = new Offer();
     offer->mutable_id()->MergeFrom(newOfferId());
     offer->mutable_framework_id()->MergeFrom(framework->id());

http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 1dda63e..ea5857c 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -16,15 +16,21 @@
  * limitations under the License.
  */
 
+#include <string>
+#include <vector>
+
 #include <gmock/gmock.h>
 
 #include <mesos/resources.hpp>
 
 #include <process/clock.hpp>
+#include <process/future.hpp>
 #include <process/gtest.hpp>
 
 #include <stout/gtest.hpp>
 
+#include "common/resources_utils.hpp"
+
 #include "master/master.hpp"
 
 #include "messages/messages.hpp"
@@ -41,11 +47,28 @@ using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
 
+using std::string;
+using std::vector;
+
 namespace mesos {
 namespace internal {
 namespace tests {
 
-class OversubscriptionTest : public MesosTest {};
+class OversubscriptionTest : public MesosTest
+{
+protected:
+  // TODO(vinod): Make this a global helper that other tests (e.g.,
+  // hierarchical allocator tests) can use.
+  Resources createRevocableResources(
+      const string& name,
+      const string& value,
+      const string& role = "*")
+  {
+    Resource resource = Resources::parse(name, value, role).get();
+    resource.mutable_revocable();
+    return resource;
+  }
+};
 
 
 // This test verifies that slave will forward the estimation of the
@@ -79,7 +102,7 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
   ASSERT_FALSE(update.isReady());
 
   // Inject an estimation of oversubscribable resources.
-  Resources resources = Resources::parse("cpus:1;mem:32").get();
+  Resources resources = createRevocableResources("cpus", "1");
   resourceEstimator.estimate(resources);
 
   AWAIT_READY(update);
@@ -97,6 +120,151 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
   Shutdown();
 }
 
+
+// This test verifies that a framework that desires revocable
+// resources gets an offer with revocable resources.
+TEST_F(OversubscriptionTest, RevocableOffer)
+{
+  // Start the master.
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Start the slave with test resource estimator.
+  TestResourceEstimator resourceEstimator;
+  slave::Flags flags = CreateSlaveFlags();
+
+  Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
+  ASSERT_SOME(slave);
+
+  // Start the framework which desires revocable resources.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  driver.start();
+
+  // Initially the framework will get all regular resources.
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
+  EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Inject an estimation of oversubscribable resources.
+  Resources resources = createRevocableResources("cpus", "1");
+  resourceEstimator.estimate(resources);
+
+  // Now the framework will get revocable resources.
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2.get().size());
+  EXPECT_EQ(resources, Resources(offers2.get()[0].resources()));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that when the master receives a new estimate for
+// oversubscribed resources it rescinds outstanding revocable offers.
+TEST_F(OversubscriptionTest, RescindRevocableOffer)
+{
+  // Start the master.
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Start the slave with test resource estimator.
+  TestResourceEstimator resourceEstimator;
+  slave::Flags flags = CreateSlaveFlags();
+
+  Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
+  ASSERT_SOME(slave);
+
+  // Start the framework which desires revocable resources.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  driver.start();
+
+  // Initially the framework will get all regular resources.
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
+  EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2));
+
+  // Inject an estimation of oversubscribable resources.
+  Resources resources = createRevocableResources("cpus", "1");
+  resourceEstimator.estimate(resources);
+
+  // Now the framework will get revocable resources.
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2.get().size());
+  EXPECT_EQ(resources, Resources(offers2.get()[0].resources()));
+
+  Future<OfferID> offerId;
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillOnce(FutureArg<1>(&offerId));
+
+  Future<vector<Offer>> offers3;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers3))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Inject another estimation of oversubscribable resources while the
+  // previous revocable offer is oustanding.
+  Resources resources2 = createRevocableResources("cpus", "2");
+  resourceEstimator.estimate(resources2);
+
+  // Advance the clock for the slave to send the new estimate.
+  Clock::pause();
+  Clock::advance(flags.oversubscribed_resources_interval);
+  Clock::settle();
+
+  // The previous revocable offer should be rescinded.
+  AWAIT_EXPECT_EQ(offers2.get()[0].id(), offerId);
+
+  // Resume the clock for next allocation.
+  Clock::resume();
+
+  // The new offer should include the latest oversubscribed resources.
+  AWAIT_READY(offers3);
+  EXPECT_NE(0u, offers3.get().size());
+  EXPECT_EQ(resources2, Resources(offers3.get()[0].resources()));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {