You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2015/09/10 00:59:21 UTC
[1/3] mesos git commit: Added tests for /reserve and /unreserve HTTP
endpoints.
Repository: mesos
Updated Branches:
refs/heads/master e758d2460 -> d0d15c5da
Added tests for /reserve and /unreserve HTTP endpoints.
Review: https://reviews.apache.org/r/35984
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0d15c5d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0d15c5d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0d15c5d
Branch: refs/heads/master
Commit: d0d15c5dafeec9b70c53454ebbeb664fb2f66d69
Parents: cc9c682
Author: Michael Park <mp...@apache.org>
Authored: Wed Aug 5 02:05:07 2015 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Wed Sep 9 15:28:29 2015 -0700
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/tests/reservation_endpoints_tests.cpp | 915 +++++++++++++++++++++++++
2 files changed, 916 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0d15c5d/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 4ef58cd..cea470e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1667,6 +1667,7 @@ mesos_tests_SOURCES = \
tests/registrar_tests.cpp \
tests/repair_tests.cpp \
tests/reservation_tests.cpp \
+ tests/reservation_endpoints_tests.cpp \
tests/resource_offers_tests.cpp \
tests/resources_tests.cpp \
tests/scheduler_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0d15c5d/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
new file mode 100644
index 0000000..795f1cf
--- /dev/null
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -0,0 +1,915 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/http.hpp>
+#include <process/pid.hpp>
+
+#include <stout/base64.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/option.hpp>
+
+#include "master/flags.hpp"
+#include "master/master.hpp"
+
+#include "tests/mesos.hpp"
+#include "tests/utils.hpp"
+
+using std::string;
+using std::vector;
+
+using mesos::internal::master::Master;
+using mesos::internal::slave::Slave;
+
+using process::Future;
+using process::PID;
+
+using process::http::BadRequest;
+using process::http::Conflict;
+using process::http::OK;
+using process::http::Response;
+using process::http::Unauthorized;
+
+using testing::_;
+using testing::DoAll;
+using testing::Eq;
+using testing::SaveArg;
+using testing::Return;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+
+// Converts a 'RepeatedPtrField<Resource>' to a 'JSON::Array'.
+// TODO(mpark): Generalize this to 'JSON::protobuf(RepeatedPtrField<T>)'.
+JSON::Array toJSONArray(
+ const google::protobuf::RepeatedPtrField<Resource>& resources)
+{
+ JSON::Array array;
+
+ array.values.reserve(resources.size());
+
+ foreach (const Resource& resource, resources) {
+ array.values.push_back(JSON::Protobuf(resource));
+ }
+
+ return array;
+}
+
+
+class ReservationEndpointsTest : public MesosTest
+{
+public:
+ // Set up the master flags such that it allows registration of the framework
+ // created with 'createFrameworkInfo'.
+ virtual master::Flags CreateMasterFlags()
+ {
+ master::Flags flags = MesosTest::CreateMasterFlags();
+ flags.allocation_interval = Milliseconds(50);
+ flags.roles = createFrameworkInfo().role();
+ return flags;
+ }
+
+ // Returns a FrameworkInfo with role, "role".
+ FrameworkInfo createFrameworkInfo()
+ {
+ FrameworkInfo info = DEFAULT_FRAMEWORK_INFO;
+ info.set_role("role");
+ return info;
+ }
+
+ hashmap<string, string> createBasicAuthHeaders(
+ const Credential& credential) const
+ {
+ return hashmap<string, string>{{
+ "Authorization",
+ "Basic " +
+ base64::encode(credential.principal() + ":" + credential.secret())
+ }};
+ }
+
+ string createRequestBody(
+ const SlaveID& slaveId, const Resources& resources) const
+ {
+ return strings::format(
+ "slaveId=%s&resources=%s",
+ slaveId.value(),
+ toJSONArray(resources)).get();
+ }
+};
+
+
+// TODO(mpark): Add tests for ACLs once they are introduced.
+
+
+// This tests that an operator can reserve/unreserve available resources.
+TEST_F(ReservationEndpointsTest, AvailableResources)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+ Resources unreserved = Resources::parse("cpus:1;mem:512").get();
+ Resources dynamicallyReserved = unreserved.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ Future<Response> response = process::http::post(
+ master.get(),
+ "reserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slaveId.get(), dynamicallyReserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<vector<Offer>> offers;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(1u, offers.get().size());
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+
+ // The filter to decline the offer "forever".
+ Filters filtersForever;
+ filtersForever.set_refuse_seconds(1000);
+
+ // Decline the offer "forever" in order to deallocate resources.
+ driver.declineOffer(offer.id(), filtersForever);
+
+ Future<Nothing> recoverResources;
+ EXPECT_CALL(allocator, recoverResources(_, _, _, _))
+ .WillOnce(DoAll(InvokeRecoverResources(&allocator),
+ FutureSatisfy(&recoverResources)));
+
+ AWAIT_READY(recoverResources);
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ response = process::http::post(
+ master.get(),
+ "unreserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slaveId.get(), dynamicallyReserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(1u, offers.get().size());
+ offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This tests that an operator can reserve offered resources by rescinding the
+// outstanding offers.
+TEST_F(ReservationEndpointsTest, ReserveOfferedResources)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+ Resources unreserved = Resources::parse("cpus:1;mem:512").get();
+ Resources dynamicallyReserved = unreserved.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<vector<Offer>> offers;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(1u, offers.get().size());
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ // Expect an offer to be rescinded!
+ EXPECT_CALL(sched, offerRescinded(_, _));
+
+ Future<Response> response = process::http::post(
+ master.get(),
+ "reserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slaveId.get(), dynamicallyReserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(1u, offers.get().size());
+ offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This tests that an operator can unreserve offered resources by rescinding the
+// outstanding offers.
+TEST_F(ReservationEndpointsTest, UnreserveOfferedResources)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+ Resources unreserved = Resources::parse("cpus:1;mem:512").get();
+ Resources dynamicallyReserved = unreserved.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ Future<Response> response = process::http::post(
+ master.get(),
+ "reserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slaveId.get(), dynamicallyReserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<vector<Offer>> offers;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(1u, offers.get().size());
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ // Expect an offer to be rescinded!
+ EXPECT_CALL(sched, offerRescinded(_, _));
+
+ response = process::http::post(
+ master.get(),
+ "unreserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slaveId.get(), dynamicallyReserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(1u, offers.get().size());
+ offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This tests that an operator can reserve a mix of available and offered
+// resources by rescinding the outstanding offers.
+TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ master::Flags masterFlags = CreateMasterFlags();
+ // Turn off allocation. We're doing it manually.
+ masterFlags.allocation_interval = Seconds(1000);
+
+ Try<PID<Master>> master = StartMaster(&allocator, masterFlags);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+ Resources available = Resources::parse("cpus:1;mem:128").get();
+ Resources offered = Resources::parse("mem:384").get();
+
+ Resources total = available + offered;
+ Resources dynamicallyReserved = total.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ // We want to get the cluster in a state where 'available' resources are left
+ // in the allocator, and 'offered' resources are offered to the framework.
+ // To achieve this state, we perform the following steps:
+ // (1) Summon an offer containing 'total' = 'available' + 'offered'.
+ // (2) Launch a "forever-running" task with 'available' resources.
+ // (3) Summon an offer containing 'offered'.
+ // (4) Kill the task, which recovers 'available' resources.
+
+ // Summon an offer and expect to receive 'available + offered' resources.
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.reviveOffers();
+
+ ASSERT_EQ(1u, offers.get().size());
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(available + offered));
+
+ // Launch a task on the 'available' resources portion of the offer, which
+ // recovers 'offered' resources portion.
+ TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
+
+ // Expect a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _));
+
+ Future<Nothing> _statusUpdateAcknowledgement =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> recoverUnusedResources;
+ EXPECT_CALL(allocator, recoverResources(_, _, _, _))
+ .WillOnce(DoAll(InvokeRecoverResources(&allocator),
+ FutureSatisfy(&recoverUnusedResources)));
+
+ driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
+
+ // Wait for TASK_RUNNING update ack and for the resources to be recovered.
+ AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(recoverUnusedResources);
+
+ // Summon an offer to receive the 'offered' resources.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.reviveOffers();
+
+ ASSERT_EQ(1u, offers.get().size());
+ offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(offered));
+
+ // Kill the task running on 'available' resources to make it available.
+ EXPECT_CALL(sched, statusUpdate(_, _));
+
+ // Wait for the used resources to be recovered.
+ Future<Resources> availableResources;
+ EXPECT_CALL(allocator, recoverResources(_, _, _, _))
+ .WillOnce(DoAll(InvokeRecoverResources(&allocator),
+ FutureArg<2>(&availableResources)))
+ .WillRepeatedly(DoDefault());
+
+ // Send a KillTask message to the master.
+ driver.killTask(taskInfo.task_id());
+
+ EXPECT_TRUE(availableResources.get().contains(available));
+
+ // At this point, we have 'available' resources in the allocator, and
+ // 'offered' resources offered to the framework.
+
+ // Expect an offer to be rescinded!
+ EXPECT_CALL(sched, offerRescinded(_, _));
+
+ Future<Response> response = process::http::post(
+ master.get(),
+ "reserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slaveId.get(), dynamicallyReserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ // Summon an offer.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.reviveOffers();
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(1u, offers.get().size());
+ offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This tests that an operator can unreserve a mix of available and offered
+// resources by rescinding the outstanding offers.
+TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
+{
+ TestAllocator<> allocator;
+
+ master::Flags masterFlags = CreateMasterFlags();
+ // Turn off allocation. We're doing it manually.
+ masterFlags.allocation_interval = Seconds(1000);
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator, masterFlags);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+ Resources available = Resources::parse("cpus:1;mem:128").get();
+ available = available.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ Resources offered = Resources::parse("mem:384").get();
+ offered = offered.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ Resources total = available + offered;
+ Resources unreserved = total.flatten();
+
+ Future<Response> response = process::http::post(
+ master.get(),
+ "reserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slaveId.get(), total));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ // We want to get the cluster in a state where 'available' resources are left
+ // in the allocator, and 'offered' resources are offered to the framework.
+ // To achieve this state, we perform the following steps:
+ // (1) Summon an offer containing 'total' = 'available' + 'offered'.
+ // (2) Launch a "forever-running" task with 'available' resources.
+ // (3) Summon an offer containing 'offered'.
+ // (4) Kill the task, which recovers 'available' resources.
+
+ // Summon an offer and expect to receive 'available + offered' resources.
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.reviveOffers();
+
+ ASSERT_EQ(1u, offers.get().size());
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(available + offered));
+
+ // Launch a task on the 'available' resources portion of the offer, which
+ // recovers 'offered' resources portion.
+ TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
+
+ // Expect a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _));
+
+ Future<Nothing> _statusUpdateAcknowledgement =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> recoverUnusedResources;
+ EXPECT_CALL(allocator, recoverResources(_, _, _, _))
+ .WillOnce(DoAll(InvokeRecoverResources(&allocator),
+ FutureSatisfy(&recoverUnusedResources)));
+
+ driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
+
+ // Wait for TASK_RUNNING update ack and for the resources to be recovered.
+ AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(recoverUnusedResources);
+
+ // Summon an offer to receive the 'offered' resources.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.reviveOffers();
+
+ ASSERT_EQ(1u, offers.get().size());
+ offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(offered));
+
+ // Kill the task running on 'available' resources to make it available.
+ EXPECT_CALL(sched, statusUpdate(_, _));
+
+ // Wait for the used resources to be recovered.
+ Future<Resources> availableResources;
+ EXPECT_CALL(allocator, recoverResources(_, _, _, _))
+ .WillOnce(DoAll(InvokeRecoverResources(&allocator),
+ FutureArg<2>(&availableResources)))
+ .WillRepeatedly(DoDefault());
+
+ // Send a KillTask message to the master.
+ driver.killTask(taskInfo.task_id());
+
+ EXPECT_TRUE(availableResources.get().contains(available));
+
+ // At this point, we have 'available' resources in the allocator, and
+ // 'offered' resources offered to the framework.
+
+ // Expect an offer to be rescinded!
+ EXPECT_CALL(sched, offerRescinded(_, _));
+
+ response = process::http::post(
+ master.get(),
+ "unreserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slaveId.get(), total));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ // Summon an offer.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.reviveOffers();
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(1u, offers.get().size());
+ offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This tests that an attempt to reserve/unreserve more resources than available
+// results in a 'Conflict' HTTP error.
+TEST_F(ReservationEndpointsTest, InsufficientResources)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+ Resources unreserved = Resources::parse("cpus:4;mem:4096").get();
+ Resources dynamicallyReserved = unreserved.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ hashmap<string, string> headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ string body = createRequestBody(slaveId.get(), dynamicallyReserved);
+
+ Future<Response> response =
+ process::http::post(master.get(), "reserve", headers, body);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(Conflict().status, response);
+
+ response = process::http::post(master.get(), "unreserve", headers, body);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(Conflict().status, response);
+
+ Shutdown();
+}
+
+
+// This tests that an attempt to reserve with no authorization header results in
+// a 'Unauthorized' HTTP error.
+TEST_F(ReservationEndpointsTest, NoHeader)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+ Resources unreserved = Resources::parse("cpus:1;mem:512").get();
+ Resources dynamicallyReserved = unreserved.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ Future<Response> response = process::http::post(
+ master.get(),
+ "reserve",
+ None(),
+ createRequestBody(slaveId.get(), dynamicallyReserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+ Unauthorized("Mesos master").status,
+ response);
+
+ response = process::http::post(
+ master.get(),
+ "unreserve",
+ None(),
+ createRequestBody(slaveId.get(), dynamicallyReserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+ Unauthorized("Mesos master").status,
+ response);
+
+ Shutdown();
+}
+
+
+// This tests that an attempt to reserve with bad credentials results in a
+// 'Unauthorized' HTTP error.
+TEST_F(ReservationEndpointsTest, BadCredentials)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ Credential credential;
+ credential.set_principal("bad-principal");
+ credential.set_secret("bad-secret");
+
+ Resources unreserved = Resources::parse("cpus:1;mem:512").get();
+ Resources dynamicallyReserved = unreserved.flatten(
+ "role", createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ hashmap<string, string> headers = createBasicAuthHeaders(credential);
+ string body = createRequestBody(slaveId.get(), dynamicallyReserved);
+
+ Future<Response> response =
+ process::http::post(master.get(), "reserve", headers, body);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+ Unauthorized("Mesos master").status,
+ response);
+
+ response = process::http::post(master.get(), "unreserve", headers, body);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+ Unauthorized("Mesos master").status,
+ response);
+
+ Shutdown();
+}
+
+
+// This tests that an attempt to reserve with no 'slaveId' results in a
+// 'BadRequest' HTTP error.
+TEST_F(ReservationEndpointsTest, NoSlaveId)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ Resources unreserved = Resources::parse("cpus:1;mem:512").get();
+ Resources dynamicallyReserved = unreserved.flatten(
+ "role", createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ hashmap<string, string> headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ string body = "resources=" + stringify(toJSONArray(dynamicallyReserved));
+
+ Future<Response> response =
+ process::http::post(master.get(), "reserve", headers, body);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+
+ response = process::http::post(master.get(), "unreserve", headers, body);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+
+ Shutdown();
+}
+
+
+// This tests that an attempt to reserve with no 'resources' results in a
+// 'BadRequest' HTTP error.
+TEST_F(ReservationEndpointsTest, NoResources)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ hashmap<string, string> headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ string body = "slaveId=" + slaveId.get().value();
+
+ Future<Response> response =
+ process::http::post(master.get(), "reserve", headers, body);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+
+ response = process::http::post(master.get(), "unreserve", headers, body);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+
+ Shutdown();
+}
+
+
+// This tests that an attempt to reserve with a non-matching principal results
+// in a 'BadRequest' HTTP error.
+TEST_F(ReservationEndpointsTest, NonMatchingPrincipal)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slaveId)));
+
+ Try<PID<Slave>> slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ Resources unreserved = Resources::parse("cpus:1;mem:512").get();
+ Resources dynamicallyReserved =
+ unreserved.flatten("role", createReservationInfo("badPrincipal"));
+
+ Future<Response> response = process::http::post(
+ master.get(),
+ "reserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slaveId.get(), dynamicallyReserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+
+ Shutdown();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
[2/3] mesos git commit: Added /unreserve HTTP endpoint to the master.
Posted by mp...@apache.org.
Added /unreserve HTTP endpoint to the master.
Review: https://reviews.apache.org/r/35983
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc9c682e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc9c682e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc9c682e
Branch: refs/heads/master
Commit: cc9c682e0108e0f5aa127e3afe43df517ab6bad7
Parents: 57a7e7d
Author: Michael Park <mp...@apache.org>
Authored: Wed Aug 5 02:00:15 2015 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Wed Sep 9 15:28:29 2015 -0700
----------------------------------------------------------------------
src/master/http.cpp | 181 +++++++++++++++++++++--------
src/master/master.cpp | 5 +
src/master/master.hpp | 27 +++++
src/master/validation.cpp | 2 +-
src/tests/master_validation_tests.cpp | 1 -
5 files changed, 164 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc9c682e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index bcf7f93..a052e55 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -745,56 +745,7 @@ Future<Response> Master::Http::reserve(const Request& request) const
// TODO(mpark): Add a reserve ACL for authorization.
- // The resources recovered by rescinding outstanding offers.
- Resources recovered;
-
- // The unreserved resources needed to satisfy the RESERVE operation.
- // This is used in an optimization where we try to only rescind
- // offers that would contribute to satisfying the Reserve operation.
- Resources remaining = resources.flatten();
-
- // We pessimistically assume that what seems like "available"
- // resources in the allocator will be gone. This can happen due to
- // the race between the allocator scheduling an 'allocate' call to
- // itself vs master's request to schedule 'updateAvailable'.
- // We greedily rescind one offer at time until we've rescinded
- // enough offers to cover for 'resources'.
- foreach (Offer* offer, utils::copy(slave->offers)) {
- // If rescinding the offer would not contribute to satisfying
- // the remaining resources, skip it.
- if (remaining == remaining - offer->resources()) {
- continue;
- }
-
- recovered += offer->resources();
- remaining -= offer->resources();
-
- // We explicitly pass 'Filters()' which has a default 'refuse_sec'
- // of 5 seconds rather than 'None()' here, so that we can
- // virtually always win the race against 'allocate'.
- master->allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- Filters());
-
- master->removeOffer(offer, true); // Rescind!
-
- // If we've rescinded enough offers to cover for 'resources',
- // we're done.
- Try<Resources> updatedRecovered = recovered.apply(operation);
- if (updatedRecovered.isSome()) {
- break;
- }
- }
-
- // Propogate the 'Future<Nothing>' as 'Future<Response>' where
- // 'Nothing' -> 'OK' and Failed -> 'Conflict'.
- return master->apply(slave, operation)
- .then([]() -> Response { return OK(); })
- .repair([](const Future<Response>& result) {
- return Conflict(result.failure());
- });
+ return _operation(slaveId, resources.flatten(), operation);
}
@@ -1818,6 +1769,79 @@ Future<Response> Master::Http::maintenanceStatus(const Request& request) const
}
+Future<Response> Master::Http::unreserve(const Request& request) const
+{
+ if (request.method != "POST") {
+ return BadRequest("Expecting POST");
+ }
+
+ // Parse the query string in the request body.
+ Try<hashmap<string, string>> decode =
+ process::http::query::decode(request.body);
+
+ if (decode.isError()) {
+ return BadRequest("Unable to decode query string: " + decode.error());
+ }
+
+ const hashmap<string, string>& values = decode.get();
+
+ if (values.get("slaveId").isNone()) {
+ return BadRequest("Missing 'slaveId' query parameter");
+ }
+
+ SlaveID slaveId;
+ slaveId.set_value(values.get("slaveId").get());
+
+ Slave* slave = master->slaves.registered.get(slaveId);
+ if (slave == NULL) {
+ return BadRequest("No slave found with specified ID");
+ }
+
+ if (values.get("resources").isNone()) {
+ return BadRequest("Missing 'resources' query parameter");
+ }
+
+ Try<JSON::Array> parse =
+ JSON::parse<JSON::Array>(values.get("resources").get());
+
+ if (parse.isError()) {
+ return BadRequest(
+ "Error in parsing 'resources' query parameter: " + parse.error());
+ }
+
+ Resources resources;
+ foreach (const JSON::Value& value, parse.get().values) {
+ Try<Resource> resource = ::protobuf::parse<Resource>(value);
+ if (resource.isError()) {
+ return BadRequest(
+ "Error in parsing 'resources' query parameter: " + resource.error());
+ }
+ resources += resource.get();
+ }
+
+ Result<Credential> credential = authenticate(request);
+ if (credential.isError()) {
+ return Unauthorized("Mesos master", credential.error());
+ }
+
+ // Create an offer operation.
+ Offer::Operation operation;
+ operation.set_type(Offer::Operation::UNRESERVE);
+ operation.mutable_unreserve()->mutable_resources()->CopyFrom(resources);
+
+ Option<Error> validate =
+ validation::operation::validate(operation.unreserve(), credential.isSome());
+
+ if (validate.isSome()) {
+ return BadRequest("Invalid UNRESERVE operation: " + validate.get().message);
+ }
+
+ // TODO(mpark): Add a unreserve ACL for authorization.
+
+ return _operation(slaveId, resources, operation);
+}
+
+
Result<Credential> Master::Http::authenticate(const Request& request) const
{
// By default, assume everyone is authenticated if no credentials
@@ -1860,6 +1884,63 @@ Result<Credential> Master::Http::authenticate(const Request& request) const
}
+Future<Response> Master::Http::_operation(
+ const SlaveID& slaveId,
+ Resources remaining,
+ const Offer::Operation& operation) const
+{
+ Slave* slave = master->slaves.registered.get(slaveId);
+ if (slave == NULL) {
+ return BadRequest("No slave found with specified ID");
+ }
+
+ // The resources recovered by rescinding outstanding offers.
+ Resources recovered;
+
+ // We pessimistically assume that what seems like "available"
+ // resources in the allocator will be gone. This can happen due to
+ // the race between the allocator scheduling an 'allocate' call to
+ // itself vs master's request to schedule 'updateAvailable'.
+ // We greedily rescind one offer at time until we've rescinded
+ // enough offers to cover for 'resources'.
+ foreach (Offer* offer, utils::copy(slave->offers)) {
+ // If rescinding the offer would not contribute to satisfying
+ // the remaining resources, skip it.
+ if (remaining == remaining - offer->resources()) {
+ continue;
+ }
+
+ recovered += offer->resources();
+ remaining -= offer->resources();
+
+ // We explicitly pass 'Filters()' which has a default 'refuse_sec'
+ // of 5 seconds rather than 'None()' here, so that we can
+ // virtually always win the race against 'allocate'.
+ master->allocator->recoverResources(
+ offer->framework_id(),
+ offer->slave_id(),
+ offer->resources(),
+ Filters());
+
+ master->removeOffer(offer, true); // Rescind!
+
+ // If we've rescinded enough offers to cover for 'resources',
+ // we're done.
+ Try<Resources> updatedRecovered = recovered.apply(operation);
+ if (updatedRecovered.isSome()) {
+ break;
+ }
+ }
+
+ // Propogate the 'Future<Nothing>' as 'Future<Response>' where
+ // 'Nothing' -> 'OK' and Failed -> 'Conflict'.
+ return master->apply(slave, operation)
+ .then([]() -> Response { return OK(); })
+ .repair([](const Future<Response>& result) {
+ return Conflict(result.failure());
+ });
+}
+
} // namespace master {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc9c682e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ea7d613..4b60e63 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -845,6 +845,11 @@ void Master::initialize()
Http::log(request);
return http.machineUp(request);
});
+ route("/unreserve",
+ None(), // TODO(mpark): Add an Http::UNRESERVE_HELP,
+ [http](const process::http::Request& request) {
+ return http.unreserve(request);
+ });
// Provide HTTP assets from a "webui" directory. This is either
// specified via flags (which is necessary for running out of the
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc9c682e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7849d68..1dfc947 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -866,6 +866,10 @@ private:
process::Future<process::http::Response> machineUp(
const process::http::Request& request) const;
+ // /master/unreserve
+ process::Future<process::http::Response> unreserve(
+ const process::http::Request& request) const;
+
const static std::string SCHEDULER_HELP;
const static std::string HEALTH_HELP;
const static std::string OBSERVE_HELP;
@@ -893,6 +897,29 @@ private:
const FrameworkID& id,
bool authorized = true) const;
+ /**
+ * Continuation for operations: /reserve, /unreserve, /create and
+ * /destroy. First tries to recover 'remaining' amount of
+ * resources by rescinding outstanding offers, then tries to apply
+ * the operation by calling 'master->apply' and propagates the
+ * 'Future<Nothing>' as 'Future<Response>' where 'Nothing' -> 'OK'
+ * and Failed -> 'Conflict'.
+ *
+ * @param slaveId The ID of the slave that the operation is
+ * updating.
+ * @param remaining The resources needed to satisfy the operation.
+ * This is used for an optimization where we try to only
+ * rescind offers that would contribute to satisfying the
+ * operation.
+ * @param operation The operation to be performed.
+ *
+ * @return Returns 'OK' if successful, 'Conflict' otherwise.
+ */
+ process::Future<process::http::Response> _operation(
+ const SlaveID& slaveId,
+ Resources remaining,
+ const Offer::Operation& operation) const;
+
Master* master;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc9c682e/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 0361d1f..f97eba6 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -724,7 +724,7 @@ Option<Error> validate(
}
if (!hasPrincipal) {
- return Error("A framework without a principal cannot unreserve resources.");
+ return Error("Resources cannot be unreserved without a principal.");
}
// NOTE: We don't check that 'FrameworkInfo.principal' matches
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc9c682e/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index 3513bca..1dff6a3 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -355,7 +355,6 @@ class UnreserveOperationValidationTest : public MesosTest {};
// This test verifies that any resources can be unreserved by any
// framework with a principal.
-// TODO(mpark): Introduce the "unreserve" ACL to prevent this.
TEST_F(UnreserveOperationValidationTest, WithoutACL)
{
Resource resource = Resources::parse("cpus", "8", "role").get();
[3/3] mesos git commit: Added /reserve HTTP endpoint to the master.
Posted by mp...@apache.org.
Added /reserve HTTP endpoint to the master.
Review: https://reviews.apache.org/r/35702
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/57a7e7d0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/57a7e7d0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/57a7e7d0
Branch: refs/heads/master
Commit: 57a7e7d0283aa08455d6572ade75a11d914c6962
Parents: e758d24
Author: Michael Park <mp...@apache.org>
Authored: Wed Aug 5 00:04:03 2015 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Wed Sep 9 15:28:29 2015 -0700
----------------------------------------------------------------------
src/master/http.cpp | 126 +++++++++++++++++++++++++++++++++++++++++
src/master/master.cpp | 36 +++++++++---
src/master/master.hpp | 24 ++++++--
src/master/validation.cpp | 14 ++---
src/master/validation.hpp | 2 +-
5 files changed, 181 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 94e97a2..bcf7f93 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -78,6 +78,7 @@ using process::USAGE;
using process::http::Accepted;
using process::http::BadRequest;
+using process::http::Conflict;
using process::http::Forbidden;
using process::http::InternalServerError;
using process::http::MethodNotAllowed;
@@ -672,6 +673,131 @@ Future<Response> Master::Http::redirect(const Request& request) const
}
+Future<Response> Master::Http::reserve(const Request& request) const
+{
+ if (request.method != "POST") {
+ return BadRequest("Expecting POST");
+ }
+
+ // Parse the query string in the request body.
+ Try<hashmap<string, string>> decode =
+ process::http::query::decode(request.body);
+
+ if (decode.isError()) {
+ return BadRequest("Unable to decode query string: " + decode.error());
+ }
+
+ const hashmap<string, string>& values = decode.get();
+
+ if (values.get("slaveId").isNone()) {
+ return BadRequest("Missing 'slaveId' query parameter");
+ }
+
+ SlaveID slaveId;
+ slaveId.set_value(values.get("slaveId").get());
+
+ Slave* slave = master->slaves.registered.get(slaveId);
+ if (slave == NULL) {
+ return BadRequest("No slave found with specified ID");
+ }
+
+ if (values.get("resources").isNone()) {
+ return BadRequest("Missing 'resources' query parameter");
+ }
+
+ Try<JSON::Array> parse =
+ JSON::parse<JSON::Array>(values.get("resources").get());
+
+ if (parse.isError()) {
+ return BadRequest(
+ "Error in parsing 'resources' query parameter: " + parse.error());
+ }
+
+ Resources resources;
+ foreach (const JSON::Value& value, parse.get().values) {
+ Try<Resource> resource = ::protobuf::parse<Resource>(value);
+ if (resource.isError()) {
+ return BadRequest(
+ "Error in parsing 'resources' query parameter: " + resource.error());
+ }
+ resources += resource.get();
+ }
+
+ Result<Credential> credential = authenticate(request);
+ if (credential.isError()) {
+ return Unauthorized("Mesos master", credential.error());
+ }
+
+ // Create an offer operation.
+ Offer::Operation operation;
+ operation.set_type(Offer::Operation::RESERVE);
+ operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
+
+ Option<string> principal =
+ credential.isSome() ? credential.get().principal() : Option<string>::none();
+
+ Option<Error> validate =
+ validation::operation::validate(operation.reserve(), None(), principal);
+
+ if (validate.isSome()) {
+ return BadRequest("Invalid RESERVE operation: " + validate.get().message);
+ }
+
+ // TODO(mpark): Add a reserve ACL for authorization.
+
+ // The resources recovered by rescinding outstanding offers.
+ Resources recovered;
+
+ // The unreserved resources needed to satisfy the RESERVE operation.
+ // This is used in an optimization where we try to only rescind
+ // offers that would contribute to satisfying the Reserve operation.
+ Resources remaining = resources.flatten();
+
+ // We pessimistically assume that what seems like "available"
+ // resources in the allocator will be gone. This can happen due to
+ // the race between the allocator scheduling an 'allocate' call to
+ // itself vs master's request to schedule 'updateAvailable'.
+ // We greedily rescind one offer at time until we've rescinded
+ // enough offers to cover for 'resources'.
+ foreach (Offer* offer, utils::copy(slave->offers)) {
+ // If rescinding the offer would not contribute to satisfying
+ // the remaining resources, skip it.
+ if (remaining == remaining - offer->resources()) {
+ continue;
+ }
+
+ recovered += offer->resources();
+ remaining -= offer->resources();
+
+ // We explicitly pass 'Filters()' which has a default 'refuse_sec'
+ // of 5 seconds rather than 'None()' here, so that we can
+ // virtually always win the race against 'allocate'.
+ master->allocator->recoverResources(
+ offer->framework_id(),
+ offer->slave_id(),
+ offer->resources(),
+ Filters());
+
+ master->removeOffer(offer, true); // Rescind!
+
+ // If we've rescinded enough offers to cover for 'resources',
+ // we're done.
+ Try<Resources> updatedRecovered = recovered.apply(operation);
+ if (updatedRecovered.isSome()) {
+ break;
+ }
+ }
+
+ // Propogate the 'Future<Nothing>' as 'Future<Response>' where
+ // 'Nothing' -> 'OK' and Failed -> 'Conflict'.
+ return master->apply(slave, operation)
+ .then([]() -> Response { return OK(); })
+ .repair([](const Future<Response>& result) {
+ return Conflict(result.failure());
+ });
+}
+
+
const string Master::Http::SLAVES_HELP = HELP(
TLDR(
"Information about registered slaves."),
http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5589eca..ea7d613 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -780,6 +780,11 @@ void Master::initialize()
[http](const process::http::Request& request) {
return http.redirect(request);
});
+ route("/reserve",
+ None(), // TODO(mpark): Add an Http::RESERVE_HELP,
+ [http](const process::http::Request& request) {
+ return http.reserve(request);
+ });
route("/roles.json",
Http::ROLES_HELP,
[http](const process::http::Request& request) {
@@ -2921,7 +2926,7 @@ void Master::_accept(
<< operation.reserve().resources() << " from framework "
<< *framework << " to slave " << *slave;
- applyOfferOperation(framework, slave, operation);
+ apply(framework, slave, operation);
break;
}
@@ -2946,7 +2951,7 @@ void Master::_accept(
<< operation.unreserve().resources() << " from framework "
<< *framework << " to slave " << *slave;
- applyOfferOperation(framework, slave, operation);
+ apply(framework, slave, operation);
break;
}
@@ -2972,7 +2977,7 @@ void Master::_accept(
<< operation.create().volumes() << " from framework "
<< *framework << " to slave " << *slave;
- applyOfferOperation(framework, slave, operation);
+ apply(framework, slave, operation);
break;
}
@@ -2998,7 +3003,7 @@ void Master::_accept(
<< operation.create().volumes() << " from framework "
<< *framework << " to slave " << *slave;
- applyOfferOperation(framework, slave, operation);
+ apply(framework, slave, operation);
break;
}
@@ -5721,7 +5726,7 @@ void Master::removeExecutor(
}
-void Master::applyOfferOperation(
+void Master::apply(
Framework* framework,
Slave* slave,
const Offer::Operation& operation)
@@ -5729,10 +5734,23 @@ void Master::applyOfferOperation(
CHECK_NOTNULL(framework);
CHECK_NOTNULL(slave);
- allocator->updateAllocation(
- framework->id(),
- slave->id,
- {operation});
+ allocator->updateAllocation(framework->id(), slave->id, {operation});
+
+ _apply(slave, operation);
+}
+
+
+Future<Nothing> Master::apply(Slave* slave, const Offer::Operation& operation)
+{
+ CHECK_NOTNULL(slave);
+
+ return allocator->updateAvailable(slave->id, {operation})
+ .onReady(defer(self(), &Master::_apply, slave, operation));
+}
+
+
+void Master::_apply(Slave* slave, const Offer::Operation& operation) {
+ CHECK_NOTNULL(slave);
slave->apply(operation);
http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e133185..7849d68 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -672,14 +672,24 @@ protected:
const FrameworkID& frameworkId,
const ExecutorID& executorId);
- // Updates slave's resources by applying the given operation. It
- // also updates the allocator and sends a CheckpointResourcesMessage
- // to the slave with slave's current checkpointed resources.
- void applyOfferOperation(
+ // Updates the allocator and updates the slave's resources by
+ // applying the given operation. It also sends a
+ // 'CheckpointResourcesMessage' to the slave with the updated
+ // checkpointed resources.
+ void apply(
Framework* framework,
Slave* slave,
const Offer::Operation& operation);
+ // Attempts to update the allocator by applying the given operation.
+ // If successful, updates the slave's resources, sends a
+ // 'CheckpointResourcesMessage' to the slave with the updated
+ // checkpointed resources, and returns a 'Future' with 'Nothing'.
+ // Otherwise, no action is taken and returns a failed 'Future'.
+ process::Future<Nothing> apply(
+ Slave* slave,
+ const Offer::Operation& operation);
+
// Forwards the update to the framework.
void forward(
const StatusUpdate& update,
@@ -702,6 +712,8 @@ protected:
Option<Credentials> credentials;
private:
+ void _apply(Slave* slave, const Offer::Operation& operation);
+
void drop(
const process::UPID& from,
const scheduler::Call& call,
@@ -810,6 +822,10 @@ private:
process::Future<process::http::Response> redirect(
const process::http::Request& request) const;
+ // /master/reserve
+ process::Future<process::http::Response> reserve(
+ const process::http::Request& request) const;
+
// /master/roles.json
process::Future<process::http::Response> roles(
const process::http::Request& request) const;
http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index ffb7bf0..0361d1f 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -668,7 +668,7 @@ namespace operation {
Option<Error> validate(
const Offer::Operation::Reserve& reserve,
- const string& role,
+ const Option<string>& role,
const Option<string>& principal)
{
Option<Error> error = resource::validate(reserve.resources());
@@ -677,7 +677,7 @@ Option<Error> validate(
}
if (principal.isNone()) {
- return Error("A framework without a principal cannot reserve resources.");
+ return Error("Cannot reserve resources without a principal.");
}
foreach (const Resource& resource, reserve.resources()) {
@@ -686,18 +686,18 @@ Option<Error> validate(
"Resource " + stringify(resource) + " is not dynamically reserved");
}
- if (resource.role() != role) {
+ if (role.isSome() && resource.role() != role.get()) {
return Error(
"The reserved resource's role '" + resource.role() +
- "' does not match the framework's role '" + role + "'");
+ "' does not match the framework's role '" + role.get() + "'");
}
if (resource.reservation().principal() != principal.get()) {
return Error(
"The reserved resource's principal '" +
- stringify(resource.reservation().principal()) +
- "' does not match the framework's principal '" +
- stringify(principal.get()) + "'");
+ resource.reservation().principal() +
+ "' does not match the principal '" +
+ principal.get() + "'");
}
// NOTE: This check would be covered by 'contains' since there
http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index 43b8d84..3434868 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -104,7 +104,7 @@ namespace operation {
// Validates the RESERVE operation.
Option<Error> validate(
const Offer::Operation::Reserve& reserve,
- const std::string& role,
+ const Option<std::string>& role,
const Option<std::string>& principal);