You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/05/29 03:02:47 UTC
[3/3] mesos git commit: Implemented 'updateSlave()' call in the
master.
Implemented 'updateSlave()' call in the master.
Review: https://reviews.apache.org/r/34736
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fbf5c7e7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fbf5c7e7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fbf5c7e7
Branch: refs/heads/master
Commit: fbf5c7e703c691f8b8bcf20ea7c324e9987beab1
Parents: 949e6ad
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed May 27 16:07:59 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu May 28 17:11:04 2015 -0700
----------------------------------------------------------------------
src/master/master.cpp | 53 ++++++++-
src/tests/oversubscription_tests.cpp | 172 +++++++++++++++++++++++++++++-
2 files changed, 219 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d61b77b..710b814 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3463,11 +3463,52 @@ void Master::updateSlave(
{
++metrics->messages_update_slave;
- LOG(INFO) << "Received update of slave " << slaveId
- << " with oversubscribed resources " << oversubscribedResources;
+ if (slaves.removed.get(slaveId).isSome()) {
+ // If the slave is removed, we have already informed
+ // frameworks that its tasks were LOST, so the slave should
+ // shut down.
+ LOG(WARNING)
+ << "Ignoring update of slave with total oversubscribed resources "
+ << oversubscribedResources << " on removed slave " << slaveId
+ << " ; asking slave to shutdown";
+
+ ShutdownMessage message;
+ message.set_message("Update slave message from unknown slave");
+ reply(message);
+ return;
+ }
+
+ if (!slaves.registered.contains(slaveId)) {
+ LOG(WARNING)
+ << "Ignoring update of slave with total oversubscribed resources "
+ << oversubscribedResources << " on unknown slave " << slaveId;
+ return;
+ }
+
+ Slave* slave = CHECK_NOTNULL(slaves.registered.get(slaveId));
+
+ LOG(INFO) << "Received update of slave " << *slave << " with total"
+ << " oversubscribed resources " << oversubscribedResources;
+
+ // First, rescind any oustanding offers with revocable resources.
+ // NOTE: Need a copy of offers because the offers are removed inside
+ // the loop.
+ foreach (Offer* offer, utils::copy(slave->offers)) {
+ const Resources offered = offer->resources();
+ if (!offered.revocable().empty()) {
+ LOG(INFO) << "Removing offer " << offer->id()
+ << " with revocable resources " << offered
+ << " on slave " << *slave;
+
+ allocator->recoverResources(
+ offer->framework_id(), offer->slave_id(), offer->resources(), None());
+
+ removeOffer(offer, true); // Rescind.
+ }
+ }
- // TODO(vinod): Rescind any oustanding revocable offers from this
- // slave and update the allocator.
+ // Now, update the allocator with the new estimate.
+ allocator->updateSlave(slaveId, oversubscribedResources);
}
@@ -3984,6 +4025,10 @@ void Master::offer(const FrameworkID& frameworkId,
}
#endif // WITH_NETWORK_ISOLATOR
+ // TODO(vinod): Split regular and revocable resources into
+ // separate offers, so that rescinding offers with revocable
+ // resources does not affect offers with regular resources.
+
Offer* offer = new Offer();
offer->mutable_id()->MergeFrom(newOfferId());
offer->mutable_framework_id()->MergeFrom(framework->id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 1dda63e..ea5857c 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -16,15 +16,21 @@
* limitations under the License.
*/
+#include <string>
+#include <vector>
+
#include <gmock/gmock.h>
#include <mesos/resources.hpp>
#include <process/clock.hpp>
+#include <process/future.hpp>
#include <process/gtest.hpp>
#include <stout/gtest.hpp>
+#include "common/resources_utils.hpp"
+
#include "master/master.hpp"
#include "messages/messages.hpp"
@@ -41,11 +47,28 @@ using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
+using std::string;
+using std::vector;
+
namespace mesos {
namespace internal {
namespace tests {
-class OversubscriptionTest : public MesosTest {};
+class OversubscriptionTest : public MesosTest
+{
+protected:
+ // TODO(vinod): Make this a global helper that other tests (e.g.,
+ // hierarchical allocator tests) can use.
+ Resources createRevocableResources(
+ const string& name,
+ const string& value,
+ const string& role = "*")
+ {
+ Resource resource = Resources::parse(name, value, role).get();
+ resource.mutable_revocable();
+ return resource;
+ }
+};
// This test verifies that slave will forward the estimation of the
@@ -79,7 +102,7 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
ASSERT_FALSE(update.isReady());
// Inject an estimation of oversubscribable resources.
- Resources resources = Resources::parse("cpus:1;mem:32").get();
+ Resources resources = createRevocableResources("cpus", "1");
resourceEstimator.estimate(resources);
AWAIT_READY(update);
@@ -97,6 +120,151 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
Shutdown();
}
+
+// This test verifies that a framework that desires revocable
+// resources gets an offer with revocable resources.
+TEST_F(OversubscriptionTest, RevocableOffer)
+{
+ // Start the master.
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Start the slave with test resource estimator.
+ TestResourceEstimator resourceEstimator;
+ slave::Flags flags = CreateSlaveFlags();
+
+ Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
+ ASSERT_SOME(slave);
+
+ // Start the framework which desires revocable resources.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.add_capabilities()->set_type(
+ FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers1;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers1));
+
+ driver.start();
+
+ // Initially the framework will get all regular resources.
+ AWAIT_READY(offers1);
+ EXPECT_NE(0u, offers1.get().size());
+ EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+
+ Future<vector<Offer>> offers2;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers2))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ // Inject an estimation of oversubscribable resources.
+ Resources resources = createRevocableResources("cpus", "1");
+ resourceEstimator.estimate(resources);
+
+ // Now the framework will get revocable resources.
+ AWAIT_READY(offers2);
+ EXPECT_NE(0u, offers2.get().size());
+ EXPECT_EQ(resources, Resources(offers2.get()[0].resources()));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that when the master receives a new estimate for
+// oversubscribed resources it rescinds outstanding revocable offers.
+TEST_F(OversubscriptionTest, RescindRevocableOffer)
+{
+ // Start the master.
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Start the slave with test resource estimator.
+ TestResourceEstimator resourceEstimator;
+ slave::Flags flags = CreateSlaveFlags();
+
+ Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
+ ASSERT_SOME(slave);
+
+ // Start the framework which desires revocable resources.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.add_capabilities()->set_type(
+ FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers1;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers1));
+
+ driver.start();
+
+ // Initially the framework will get all regular resources.
+ AWAIT_READY(offers1);
+ EXPECT_NE(0u, offers1.get().size());
+ EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+
+ Future<vector<Offer>> offers2;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers2));
+
+ // Inject an estimation of oversubscribable resources.
+ Resources resources = createRevocableResources("cpus", "1");
+ resourceEstimator.estimate(resources);
+
+ // Now the framework will get revocable resources.
+ AWAIT_READY(offers2);
+ EXPECT_NE(0u, offers2.get().size());
+ EXPECT_EQ(resources, Resources(offers2.get()[0].resources()));
+
+ Future<OfferID> offerId;
+ EXPECT_CALL(sched, offerRescinded(&driver, _))
+ .WillOnce(FutureArg<1>(&offerId));
+
+ Future<vector<Offer>> offers3;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers3))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ // Inject another estimation of oversubscribable resources while the
+ // previous revocable offer is oustanding.
+ Resources resources2 = createRevocableResources("cpus", "2");
+ resourceEstimator.estimate(resources2);
+
+ // Advance the clock for the slave to send the new estimate.
+ Clock::pause();
+ Clock::advance(flags.oversubscribed_resources_interval);
+ Clock::settle();
+
+ // The previous revocable offer should be rescinded.
+ AWAIT_EXPECT_EQ(offers2.get()[0].id(), offerId);
+
+ // Resume the clock for next allocation.
+ Clock::resume();
+
+ // The new offer should include the latest oversubscribed resources.
+ AWAIT_READY(offers3);
+ EXPECT_NE(0u, offers3.get().size());
+ EXPECT_EQ(resources2, Resources(offers3.get()[0].resources()));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {