You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/23 23:19:24 UTC

mesos git commit: Maintenance Primitives: Accept/Decline responses in maintenance/status.

Repository: mesos
Updated Branches:
  refs/heads/master 2f92699e1 -> a0fd3491e


Maintenance Primitives: Accept/Decline responses in maintenance/status.

Adds a `getInverseOfferStatuses` method to the allocator, which returns
some `InverseOfferStatus` objects, grouped by Agent and Framework.

Changes the `/maintenance/status` endpoint to return this additional
information about draining machines.

Review: https://reviews.apache.org/r/38653


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0fd3491
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0fd3491
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0fd3491

Branch: refs/heads/master
Commit: a0fd3491e2408119f79f7a98e613c2f5ea99c115
Parents: 2f92699
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Wed Sep 23 16:47:19 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Wed Sep 23 17:18:58 2015 -0400

----------------------------------------------------------------------
 include/mesos/maintenance/maintenance.proto | 13 ++++-
 include/mesos/master/allocator.hpp          |  5 ++
 include/mesos/master/allocator.proto        |  6 ++
 src/common/protobuf_utils.cpp               |  8 +++
 src/common/protobuf_utils.hpp               |  4 ++
 src/master/allocator/mesos/allocator.hpp    | 20 +++++++
 src/master/allocator/mesos/hierarchical.hpp | 27 +++++++++
 src/master/http.cpp                         | 71 +++++++++++++++++-------
 src/master/master.cpp                       |  4 ++
 src/tests/master_maintenance_tests.cpp      | 70 ++++++++++++++++++++++-
 src/tests/mesos.hpp                         | 16 ++++++
 11 files changed, 222 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/include/mesos/maintenance/maintenance.proto
----------------------------------------------------------------------
diff --git a/include/mesos/maintenance/maintenance.proto b/include/mesos/maintenance/maintenance.proto
index ee01c5d..aaca251 100644
--- a/include/mesos/maintenance/maintenance.proto
+++ b/include/mesos/maintenance/maintenance.proto
@@ -17,6 +17,7 @@
  */
 
 import "mesos/mesos.proto";
+import "mesos/master/allocator.proto";
 
 package mesos.maintenance;
 
@@ -69,9 +70,17 @@ message Schedule {
 
 /**
  * Represents the maintenance status of each machine in the cluster.
- * Corresponds to the `MachineInfo.Mode` enumeration.
+ * The lists correspond to the `MachineInfo.Mode` enumeration.
  */
 message ClusterStatus {
-  repeated MachineID draining_machines = 1;
+  message DrainingMachine {
+    required MachineID id = 1;
+
+    // A list of the most recent responses to inverse offers from frameworks
+    // running on this draining machine.
+    repeated master.InverseOfferStatus statuses = 2;
+  }
+
+  repeated DrainingMachine draining_machines = 1;
   repeated MachineID down_machines = 2;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 2fd00ca..8100f14 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -163,6 +163,11 @@ public:
       const Option<InverseOfferStatus>& status,
       const Option<Filters>& filters = None()) = 0;
 
+  // Retrieves the status of all inverse offers maintained by the allocator.
+  virtual process::Future<
+      hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+    getInverseOfferStatuses() = 0;
+
   // Informs the Allocator to recover resources that are considered
   // used by the framework.
   virtual void recoverResources(

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/include/mesos/master/allocator.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.proto b/include/mesos/master/allocator.proto
index b42f19d..224da71 100644
--- a/include/mesos/master/allocator.proto
+++ b/include/mesos/master/allocator.proto
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+import "mesos/mesos.proto";
+
 package mesos.master;
 
 
@@ -54,6 +56,10 @@ message InverseOfferStatus {
   }
 
   required Status status = 1;
+  required FrameworkID framework_id = 2;
+
+  // Time, since the epoch, when this status was last updated.
+  required TimeInfo timestamp = 3;
 
   // TODO(jmlvanre): Capture decline message.
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 4dc58fe..c1e8e01 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -221,6 +221,14 @@ Label createLabel(const std::string& key, const std::string& value)
   return label;
 }
 
+
+TimeInfo getCurrentTime()
+{
+  TimeInfo timeInfo;
+  timeInfo.set_nanoseconds(process::Clock::now().duration().ns());
+  return timeInfo;
+}
+
 namespace slave {
 
 ContainerLimitation createContainerLimitation(

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 3817c6a..8793851 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -84,6 +84,10 @@ MasterInfo createMasterInfo(const process::UPID& pid);
 
 Label createLabel(const std::string& key, const std::string& value);
 
+
+// Helper function that fills in a TimeInfo from the current time.
+TimeInfo getCurrentTime();
+
 namespace slave {
 
 mesos::slave::ContainerLimitation createContainerLimitation(

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index dca2565..c5375aa 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -25,6 +25,7 @@
 #include <process/future.hpp>
 #include <process/process.hpp>
 
+#include <stout/hashmap.hpp>
 #include <stout/try.hpp>
 
 namespace mesos {
@@ -123,6 +124,10 @@ public:
       const Option<mesos::master::InverseOfferStatus>& status,
       const Option<Filters>& filters);
 
+  process::Future<
+      hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+    getInverseOfferStatuses();
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -232,6 +237,10 @@ public:
       const Option<mesos::master::InverseOfferStatus>& status,
       const Option<Filters>& filters = None()) = 0;
 
+  virtual process::Future<
+      hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+    getInverseOfferStatuses() = 0;
+
   virtual void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -506,6 +515,17 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
 
 
 template <typename AllocatorProcess>
+inline process::Future<
+    hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+  MesosAllocator<AllocatorProcess>::getInverseOfferStatuses()
+{
+  return process::dispatch(
+      process,
+      &MesosAllocatorProcess::getInverseOfferStatuses);
+}
+
+
+template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 4ec08fd..f3a9b9d 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -161,6 +161,10 @@ public:
       const Option<mesos::master::InverseOfferStatus>& status,
       const Option<Filters>& filters);
 
+  process::Future<
+      hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+    getInverseOfferStatuses();
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -1031,6 +1035,29 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
 
 
 template <class RoleSorter, class FrameworkSorter>
+process::Future<
+    hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+HierarchicalAllocatorProcess<
+    RoleSorter, FrameworkSorter>::getInverseOfferStatuses()
+{
+  CHECK(initialized);
+
+  hashmap<
+      SlaveID,
+      hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result;
+
+  // Make a copy of the most recent statuses.
+  foreachpair (const SlaveID& id, const Slave& slave, slaves) {
+    if (slave.maintenance.isSome()) {
+      result[id] = slave.maintenance.get().statuses;
+    }
+  }
+
+  return result;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
 void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
     const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index fb5315c..a92c276 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -62,6 +62,7 @@
 
 #include "logging/logging.hpp"
 
+#include "master/machine.hpp"
 #include "master/maintenance.hpp"
 #include "master/master.hpp"
 #include "master/validation.hpp"
@@ -1749,7 +1750,11 @@ const string Master::Http::MAINTENANCE_STATUS_HELP = HELP(
     TLDR(
         "Retrieves the maintenance status of the cluster."),
     DESCRIPTION(
-        "Returns an object with one list of machines per machine mode."));
+        "Returns an object with one list of machines per machine mode.",
+        "For draining machines, this list includes the frameworks' responses",
+        "to inverse offers.  NOTE: Inverse offer responses are cleared if",
+        "the master fails over.  However, new inverse offers will be sent",
+        "once the master recovers."));
 
 
 // /master/maintenance/status endpoint handler.
@@ -1759,27 +1764,55 @@ Future<Response> Master::Http::maintenanceStatus(const Request& request) const
     return BadRequest("Expecting GET, got '" + request.method + "'");
   }
 
-  // Unwrap the master's machine information into two arrays of machines.
-  mesos::maintenance::ClusterStatus status;
-  foreachkey (const MachineID& id, master->machines) {
-    switch (master->machines[id].info.mode()) {
-      case MachineInfo::DRAINING: {
-        status.add_draining_machines()->CopyFrom(id);
-        break;
-      }
-      case MachineInfo::DOWN: {
-        status.add_down_machines()->CopyFrom(id);
-        break;
-      }
-      // Currently, `UP` machines are not specifically tracked in the master.
-      case MachineInfo::UP: {}
-      default: {
-        break;
+  return master->allocator->getInverseOfferStatuses()
+    .then(defer(
+        master->self(),
+        [=](
+            hashmap<
+                SlaveID,
+                hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result)
+          -> Future<Response> {
+
+    // Unwrap the master's machine information into two arrays of machines.
+    // The data is coming from the allocator and therefore could be stale.
+    // Also, if the master fails over, this data is cleared.
+    mesos::maintenance::ClusterStatus status;
+    foreachpair (const MachineID& id, const Machine& machine, master->machines) {
+      switch (machine.info.mode()) {
+        case MachineInfo::DRAINING: {
+          mesos::maintenance::ClusterStatus::DrainingMachine* drainingMachine =
+            status.add_draining_machines();
+
+          drainingMachine->mutable_id()->CopyFrom(id);
+
+          // Unwrap inverse offer status information from the allocator.
+          foreach (const SlaveID& slave, machine.slaves) {
+            if (result.contains(slave)) {
+              foreachvalue (
+                  const mesos::master::InverseOfferStatus& status,
+                  result[slave]) {
+                drainingMachine->add_statuses()->CopyFrom(status);
+              }
+            }
+          }
+          break;
+        }
+
+        case MachineInfo::DOWN: {
+          status.add_down_machines()->CopyFrom(id);
+          break;
+        }
+
+        // Currently, `UP` machines are not specifically tracked in the master.
+        case MachineInfo::UP: {}
+        default: {
+          break;
+        }
       }
     }
-  }
 
-  return OK(JSON::Protobuf(status), request.query.get("jsonp"));
+    return OK(JSON::Protobuf(status), request.query.get("jsonp"));
+  }));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 90ef8c6..5ca1941 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2843,6 +2843,8 @@ void Master::accept(
       if (inverseOffer != NULL) {
         mesos::master::InverseOfferStatus status;
         status.set_status(mesos::master::InverseOfferStatus::ACCEPT);
+        status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id());
+        status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
 
         allocator->updateInverseOffer(
             inverseOffer->slave_id(),
@@ -3318,6 +3320,8 @@ void Master::decline(
     if (inverseOffer != NULL) { // If this is an inverse offer.
       mesos::master::InverseOfferStatus status;
       status.set_status(mesos::master::InverseOfferStatus::DECLINE);
+      status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id());
+      status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
 
       allocator->updateInverseOffer(
           inverseOffer->slave_id(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index c5277a1..89ad138 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -1025,11 +1025,13 @@ TEST_F(MasterMaintenanceTest, MachineStatus)
   ASSERT_SOME(statuses);
   ASSERT_EQ(1, statuses.get().draining_machines().size());
   ASSERT_EQ(0, statuses.get().down_machines().size());
-  ASSERT_EQ("0.0.0.2", statuses.get().draining_machines(0).ip());
+  ASSERT_EQ("0.0.0.2", statuses.get().draining_machines(0).id().ip());
 }
 
 
 // Test ensures that accept and decline works with inverse offers.
+// And that accepted/declined inverse offers will be reflected
+// in the maintenance status endpoint.
 TEST_F(MasterMaintenanceTest, InverseOffers)
 {
   // Set up a master.
@@ -1061,6 +1063,24 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
 
+  // Sanity check that this machine shows up in the status endpoint
+  // and there should be no inverse offer status.
+  response = process::http::get(master.get(), "maintenance/status");
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  Try<JSON::Object> statuses_ = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(statuses_);
+  Try<maintenance::ClusterStatus> statuses =
+    ::protobuf::parse<maintenance::ClusterStatus>(statuses_.get());
+
+  ASSERT_SOME(statuses);
+  ASSERT_EQ(0, statuses.get().down_machines().size());
+  ASSERT_EQ(1, statuses.get().draining_machines().size());
+  ASSERT_EQ(
+      maintenanceHostname,
+      statuses.get().draining_machines(0).id().hostname());
+  ASSERT_EQ(0, statuses.get().draining_machines(0).statuses().size());
+
   // Now start a framework.
   Callbacks callbacks;
 
@@ -1194,6 +1214,30 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
   EXPECT_EQ(1, event.get().offers().inverse_offers().size());
   inverseOffer = event.get().offers().inverse_offers(0);
 
+  // Check that the status endpoint shows the inverse offer as declined.
+  response = process::http::get(master.get(), "maintenance/status");
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  statuses_ = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(statuses_);
+  statuses = ::protobuf::parse<maintenance::ClusterStatus>(statuses_.get());
+
+  ASSERT_SOME(statuses);
+  ASSERT_EQ(0, statuses.get().down_machines().size());
+  ASSERT_EQ(1, statuses.get().draining_machines().size());
+  ASSERT_EQ(
+      maintenanceHostname,
+      statuses.get().draining_machines(0).id().hostname());
+
+  ASSERT_EQ(1, statuses.get().draining_machines(0).statuses().size());
+  ASSERT_EQ(
+      mesos::master::InverseOfferStatus::DECLINE,
+      statuses.get().draining_machines(0).statuses(0).status());
+
+  ASSERT_EQ(
+      id,
+      evolve(statuses.get().draining_machines(0).statuses(0).framework_id()));
+
   {
     // Accept an inverse offer, with filter.
     Call call;
@@ -1218,6 +1262,30 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
   EXPECT_EQ(0, event.get().offers().offers().size());
   EXPECT_EQ(1, event.get().offers().inverse_offers().size());
 
+  // Check that the status endpoint shows the inverse offer as accepted.
+  response = process::http::get(master.get(), "maintenance/status");
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  statuses_ = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(statuses_);
+  statuses = ::protobuf::parse<maintenance::ClusterStatus>(statuses_.get());
+
+  ASSERT_SOME(statuses);
+  ASSERT_EQ(0, statuses.get().down_machines().size());
+  ASSERT_EQ(1, statuses.get().draining_machines().size());
+  ASSERT_EQ(
+      maintenanceHostname,
+      statuses.get().draining_machines(0).id().hostname());
+
+  ASSERT_EQ(1, statuses.get().draining_machines(0).statuses().size());
+  ASSERT_EQ(
+      mesos::master::InverseOfferStatus::ACCEPT,
+      statuses.get().draining_machines(0).statuses(0).status());
+
+  ASSERT_EQ(
+      id,
+      evolve(statuses.get().draining_machines(0).statuses(0).framework_id()));
+
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0fd3491/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ff241cc..3e58b45 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1365,6 +1365,12 @@ ACTION_P(InvokeUpdateInverseOffer, allocator)
 }
 
 
+ACTION_P(InvokeGetInverseOfferStatuses, allocator)
+{
+  return allocator->real->getInverseOfferStatuses();
+}
+
+
 ACTION_P(InvokeRecoverResources, allocator)
 {
   allocator->real->recoverResources(arg0, arg1, arg2, arg3);
@@ -1504,6 +1510,11 @@ public:
     EXPECT_CALL(*this, updateInverseOffer(_, _, _, _, _))
       .WillRepeatedly(DoDefault());
 
+    ON_CALL(*this, getInverseOfferStatuses())
+      .WillByDefault(InvokeGetInverseOfferStatuses(this));
+    EXPECT_CALL(*this, getInverseOfferStatuses())
+      .WillRepeatedly(DoDefault());
+
     ON_CALL(*this, recoverResources(_, _, _, _))
       .WillByDefault(InvokeRecoverResources(this));
     EXPECT_CALL(*this, recoverResources(_, _, _, _))
@@ -1597,6 +1608,11 @@ public:
       const Option<mesos::master::InverseOfferStatus>&,
       const Option<Filters>&));
 
+  MOCK_METHOD0(getInverseOfferStatuses, process::Future<
+      hashmap<SlaveID, hashmap<
+          FrameworkID,
+          mesos::master::InverseOfferStatus>>>());
+
   MOCK_METHOD4(recoverResources, void(
       const FrameworkID&,
       const SlaveID&,