You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/23 23:19:24 UTC
mesos git commit: Maintenance Primitives: Accept/Decline responses in
maintenance/status.
Repository: mesos
Updated Branches:
refs/heads/master 2f92699e1 -> a0fd3491e
Maintenance Primitives: Accept/Decline responses in maintenance/status.
Adds a `getInverseOfferStatuses` method to the allocator, which returns
some `InverseOfferStatus` objects, grouped by Agent and Framework.
Changes the `/maintenance/status` endpoint to return this additional
information about draining machines.
Review: https://reviews.apache.org/r/38653
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0fd3491
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0fd3491
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0fd3491
Branch: refs/heads/master
Commit: a0fd3491e2408119f79f7a98e613c2f5ea99c115
Parents: 2f92699
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Wed Sep 23 16:47:19 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Wed Sep 23 17:18:58 2015 -0400
----------------------------------------------------------------------
include/mesos/maintenance/maintenance.proto | 13 ++++-
include/mesos/master/allocator.hpp | 5 ++
include/mesos/master/allocator.proto | 6 ++
src/common/protobuf_utils.cpp | 8 +++
src/common/protobuf_utils.hpp | 4 ++
src/master/allocator/mesos/allocator.hpp | 20 +++++++
src/master/allocator/mesos/hierarchical.hpp | 27 +++++++++
src/master/http.cpp | 71 +++++++++++++++++-------
src/master/master.cpp | 4 ++
src/tests/master_maintenance_tests.cpp | 70 ++++++++++++++++++++++-
src/tests/mesos.hpp | 16 ++++++
11 files changed, 222 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/include/mesos/maintenance/maintenance.proto
----------------------------------------------------------------------
diff --git a/include/mesos/maintenance/maintenance.proto b/include/mesos/maintenance/maintenance.proto
index ee01c5d..aaca251 100644
--- a/include/mesos/maintenance/maintenance.proto
+++ b/include/mesos/maintenance/maintenance.proto
@@ -17,6 +17,7 @@
*/
import "mesos/mesos.proto";
+import "mesos/master/allocator.proto";
package mesos.maintenance;
@@ -69,9 +70,17 @@ message Schedule {
/**
* Represents the maintenance status of each machine in the cluster.
- * Corresponds to the `MachineInfo.Mode` enumeration.
+ * The lists correspond to the `MachineInfo.Mode` enumeration.
*/
message ClusterStatus {
- repeated MachineID draining_machines = 1;
+ message DrainingMachine {
+ required MachineID id = 1;
+
+ // A list of the most recent responses to inverse offers from frameworks
+ // running on this draining machine.
+ repeated master.InverseOfferStatus statuses = 2;
+ }
+
+ repeated DrainingMachine draining_machines = 1;
repeated MachineID down_machines = 2;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 2fd00ca..8100f14 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -163,6 +163,11 @@ public:
const Option<InverseOfferStatus>& status,
const Option<Filters>& filters = None()) = 0;
+ // Retrieves the status of all inverse offers maintained by the allocator.
+ virtual process::Future<
+ hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+ getInverseOfferStatuses() = 0;
+
// Informs the Allocator to recover resources that are considered
// used by the framework.
virtual void recoverResources(
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/include/mesos/master/allocator.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.proto b/include/mesos/master/allocator.proto
index b42f19d..224da71 100644
--- a/include/mesos/master/allocator.proto
+++ b/include/mesos/master/allocator.proto
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+import "mesos/mesos.proto";
+
package mesos.master;
@@ -54,6 +56,10 @@ message InverseOfferStatus {
}
required Status status = 1;
+ required FrameworkID framework_id = 2;
+
+ // Time, since the epoch, when this status was last updated.
+ required TimeInfo timestamp = 3;
// TODO(jmlvanre): Capture decline message.
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 4dc58fe..c1e8e01 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -221,6 +221,14 @@ Label createLabel(const std::string& key, const std::string& value)
return label;
}
+
+TimeInfo getCurrentTime()
+{
+ TimeInfo timeInfo;
+ timeInfo.set_nanoseconds(process::Clock::now().duration().ns());
+ return timeInfo;
+}
+
namespace slave {
ContainerLimitation createContainerLimitation(
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 3817c6a..8793851 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -84,6 +84,10 @@ MasterInfo createMasterInfo(const process::UPID& pid);
Label createLabel(const std::string& key, const std::string& value);
+
+// Helper function that fills in a TimeInfo from the current time.
+TimeInfo getCurrentTime();
+
namespace slave {
mesos::slave::ContainerLimitation createContainerLimitation(
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index dca2565..c5375aa 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -25,6 +25,7 @@
#include <process/future.hpp>
#include <process/process.hpp>
+#include <stout/hashmap.hpp>
#include <stout/try.hpp>
namespace mesos {
@@ -123,6 +124,10 @@ public:
const Option<mesos::master::InverseOfferStatus>& status,
const Option<Filters>& filters);
+ process::Future<
+ hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+ getInverseOfferStatuses();
+
void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -232,6 +237,10 @@ public:
const Option<mesos::master::InverseOfferStatus>& status,
const Option<Filters>& filters = None()) = 0;
+ virtual process::Future<
+ hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+ getInverseOfferStatuses() = 0;
+
virtual void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -506,6 +515,17 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
template <typename AllocatorProcess>
+inline process::Future<
+ hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+ MesosAllocator<AllocatorProcess>::getInverseOfferStatuses()
+{
+ return process::dispatch(
+ process,
+ &MesosAllocatorProcess::getInverseOfferStatuses);
+}
+
+
+template <typename AllocatorProcess>
inline void MesosAllocator<AllocatorProcess>::recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 4ec08fd..f3a9b9d 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -161,6 +161,10 @@ public:
const Option<mesos::master::InverseOfferStatus>& status,
const Option<Filters>& filters);
+ process::Future<
+ hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+ getInverseOfferStatuses();
+
void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -1031,6 +1035,29 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
template <class RoleSorter, class FrameworkSorter>
+process::Future<
+ hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+HierarchicalAllocatorProcess<
+ RoleSorter, FrameworkSorter>::getInverseOfferStatuses()
+{
+ CHECK(initialized);
+
+ hashmap<
+ SlaveID,
+ hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result;
+
+ // Make a copy of the most recent statuses.
+ foreachpair (const SlaveID& id, const Slave& slave, slaves) {
+ if (slave.maintenance.isSome()) {
+ result[id] = slave.maintenance.get().statuses;
+ }
+ }
+
+ return result;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
const FrameworkID& frameworkId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index fb5315c..a92c276 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -62,6 +62,7 @@
#include "logging/logging.hpp"
+#include "master/machine.hpp"
#include "master/maintenance.hpp"
#include "master/master.hpp"
#include "master/validation.hpp"
@@ -1749,7 +1750,11 @@ const string Master::Http::MAINTENANCE_STATUS_HELP = HELP(
TLDR(
"Retrieves the maintenance status of the cluster."),
DESCRIPTION(
- "Returns an object with one list of machines per machine mode."));
+ "Returns an object with one list of machines per machine mode.",
+ "For draining machines, this list includes the frameworks' responses",
+ "to inverse offers. NOTE: Inverse offer responses are cleared if",
+ "the master fails over. However, new inverse offers will be sent",
+ "once the master recovers."));
// /master/maintenance/status endpoint handler.
@@ -1759,27 +1764,55 @@ Future<Response> Master::Http::maintenanceStatus(const Request& request) const
return BadRequest("Expecting GET, got '" + request.method + "'");
}
- // Unwrap the master's machine information into two arrays of machines.
- mesos::maintenance::ClusterStatus status;
- foreachkey (const MachineID& id, master->machines) {
- switch (master->machines[id].info.mode()) {
- case MachineInfo::DRAINING: {
- status.add_draining_machines()->CopyFrom(id);
- break;
- }
- case MachineInfo::DOWN: {
- status.add_down_machines()->CopyFrom(id);
- break;
- }
- // Currently, `UP` machines are not specifically tracked in the master.
- case MachineInfo::UP: {}
- default: {
- break;
+ return master->allocator->getInverseOfferStatuses()
+ .then(defer(
+ master->self(),
+ [=](
+ hashmap<
+ SlaveID,
+ hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result)
+ -> Future<Response> {
+
+ // Unwrap the master's machine information into two arrays of machines.
+ // The data is coming from the allocator and therefore could be stale.
+ // Also, if the master fails over, this data is cleared.
+ mesos::maintenance::ClusterStatus status;
+ foreachpair (const MachineID& id, const Machine& machine, master->machines) {
+ switch (machine.info.mode()) {
+ case MachineInfo::DRAINING: {
+ mesos::maintenance::ClusterStatus::DrainingMachine* drainingMachine =
+ status.add_draining_machines();
+
+ drainingMachine->mutable_id()->CopyFrom(id);
+
+ // Unwrap inverse offer status information from the allocator.
+ foreach (const SlaveID& slave, machine.slaves) {
+ if (result.contains(slave)) {
+ foreachvalue (
+ const mesos::master::InverseOfferStatus& status,
+ result[slave]) {
+ drainingMachine->add_statuses()->CopyFrom(status);
+ }
+ }
+ }
+ break;
+ }
+
+ case MachineInfo::DOWN: {
+ status.add_down_machines()->CopyFrom(id);
+ break;
+ }
+
+ // Currently, `UP` machines are not specifically tracked in the master.
+ case MachineInfo::UP: {}
+ default: {
+ break;
+ }
}
}
- }
- return OK(JSON::Protobuf(status), request.query.get("jsonp"));
+ return OK(JSON::Protobuf(status), request.query.get("jsonp"));
+ }));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 90ef8c6..5ca1941 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2843,6 +2843,8 @@ void Master::accept(
if (inverseOffer != NULL) {
mesos::master::InverseOfferStatus status;
status.set_status(mesos::master::InverseOfferStatus::ACCEPT);
+ status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id());
+ status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
allocator->updateInverseOffer(
inverseOffer->slave_id(),
@@ -3318,6 +3320,8 @@ void Master::decline(
if (inverseOffer != NULL) { // If this is an inverse offer.
mesos::master::InverseOfferStatus status;
status.set_status(mesos::master::InverseOfferStatus::DECLINE);
+ status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id());
+ status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
allocator->updateInverseOffer(
inverseOffer->slave_id(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index c5277a1..89ad138 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -1025,11 +1025,13 @@ TEST_F(MasterMaintenanceTest, MachineStatus)
ASSERT_SOME(statuses);
ASSERT_EQ(1, statuses.get().draining_machines().size());
ASSERT_EQ(0, statuses.get().down_machines().size());
- ASSERT_EQ("0.0.0.2", statuses.get().draining_machines(0).ip());
+ ASSERT_EQ("0.0.0.2", statuses.get().draining_machines(0).id().ip());
}
// Test ensures that accept and decline works with inverse offers.
+// And that accepted/declined inverse offers will be reflected
+// in the maintenance status endpoint.
TEST_F(MasterMaintenanceTest, InverseOffers)
{
// Set up a master.
@@ -1061,6 +1063,24 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ // Sanity check that this machine shows up in the status endpoint
+ // and there should be no inverse offer status.
+ response = process::http::get(master.get(), "maintenance/status");
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ Try<JSON::Object> statuses_ = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(statuses_);
+ Try<maintenance::ClusterStatus> statuses =
+ ::protobuf::parse<maintenance::ClusterStatus>(statuses_.get());
+
+ ASSERT_SOME(statuses);
+ ASSERT_EQ(0, statuses.get().down_machines().size());
+ ASSERT_EQ(1, statuses.get().draining_machines().size());
+ ASSERT_EQ(
+ maintenanceHostname,
+ statuses.get().draining_machines(0).id().hostname());
+ ASSERT_EQ(0, statuses.get().draining_machines(0).statuses().size());
+
// Now start a framework.
Callbacks callbacks;
@@ -1194,6 +1214,30 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
EXPECT_EQ(1, event.get().offers().inverse_offers().size());
inverseOffer = event.get().offers().inverse_offers(0);
+ // Check that the status endpoint shows the inverse offer as declined.
+ response = process::http::get(master.get(), "maintenance/status");
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ statuses_ = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(statuses_);
+ statuses = ::protobuf::parse<maintenance::ClusterStatus>(statuses_.get());
+
+ ASSERT_SOME(statuses);
+ ASSERT_EQ(0, statuses.get().down_machines().size());
+ ASSERT_EQ(1, statuses.get().draining_machines().size());
+ ASSERT_EQ(
+ maintenanceHostname,
+ statuses.get().draining_machines(0).id().hostname());
+
+ ASSERT_EQ(1, statuses.get().draining_machines(0).statuses().size());
+ ASSERT_EQ(
+ mesos::master::InverseOfferStatus::DECLINE,
+ statuses.get().draining_machines(0).statuses(0).status());
+
+ ASSERT_EQ(
+ id,
+ evolve(statuses.get().draining_machines(0).statuses(0).framework_id()));
+
{
// Accept an inverse offer, with filter.
Call call;
@@ -1218,6 +1262,30 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
EXPECT_EQ(0, event.get().offers().offers().size());
EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+ // Check that the status endpoint shows the inverse offer as accepted.
+ response = process::http::get(master.get(), "maintenance/status");
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ statuses_ = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(statuses_);
+ statuses = ::protobuf::parse<maintenance::ClusterStatus>(statuses_.get());
+
+ ASSERT_SOME(statuses);
+ ASSERT_EQ(0, statuses.get().down_machines().size());
+ ASSERT_EQ(1, statuses.get().draining_machines().size());
+ ASSERT_EQ(
+ maintenanceHostname,
+ statuses.get().draining_machines(0).id().hostname());
+
+ ASSERT_EQ(1, statuses.get().draining_machines(0).statuses().size());
+ ASSERT_EQ(
+ mesos::master::InverseOfferStatus::ACCEPT,
+ statuses.get().draining_machines(0).statuses(0).status());
+
+ ASSERT_EQ(
+ id,
+ evolve(statuses.get().draining_machines(0).statuses(0).framework_id()));
+
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ff241cc..3e58b45 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1365,6 +1365,12 @@ ACTION_P(InvokeUpdateInverseOffer, allocator)
}
+ACTION_P(InvokeGetInverseOfferStatuses, allocator)
+{
+ return allocator->real->getInverseOfferStatuses();
+}
+
+
ACTION_P(InvokeRecoverResources, allocator)
{
allocator->real->recoverResources(arg0, arg1, arg2, arg3);
@@ -1504,6 +1510,11 @@ public:
EXPECT_CALL(*this, updateInverseOffer(_, _, _, _, _))
.WillRepeatedly(DoDefault());
+ ON_CALL(*this, getInverseOfferStatuses())
+ .WillByDefault(InvokeGetInverseOfferStatuses(this));
+ EXPECT_CALL(*this, getInverseOfferStatuses())
+ .WillRepeatedly(DoDefault());
+
ON_CALL(*this, recoverResources(_, _, _, _))
.WillByDefault(InvokeRecoverResources(this));
EXPECT_CALL(*this, recoverResources(_, _, _, _))
@@ -1597,6 +1608,11 @@ public:
const Option<mesos::master::InverseOfferStatus>&,
const Option<Filters>&));
+ MOCK_METHOD0(getInverseOfferStatuses, process::Future<
+ hashmap<SlaveID, hashmap<
+ FrameworkID,
+ mesos::master::InverseOfferStatus>>>());
+
MOCK_METHOD4(recoverResources, void(
const FrameworkID&,
const SlaveID&,