You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/07/16 20:43:22 UTC

[mesos] 05/09: Implemented master endpoints for agent draining.

This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3066c88d353c466b997af83ff58ada77ee8d5964
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Tue Jul 2 11:33:33 2019 -0700

    Implemented master endpoints for agent draining.
    
    This fleshes out three master calls:
      * DRAIN_AGENT
      * DEACTIVATE_AGENT
      * REACTIVATE_AGENT
    
    The master now stores a mapping of agent draining or deactivation
    information.  When an agent reconnects, this information is checked
    before informing the allocator about the agent.
    
    This commit leaves out certain aspects of each endpoint's validation,
    such as checking if draining agents are not in maintenance schedules.
    
    The DRAIN_AGENT call is not completely implemented here, because the
    transition from DRAINING to DRAINED will be added in a separate commit.
    
    Review: https://reviews.apache.org/r/70996
---
 src/master/http.cpp   | 120 ++++++++++++++++++++++++++++++++++++++++++++++++--
 src/master/master.cpp |  50 ++++++++++++++++++---
 src/master/master.hpp |  27 +++++++++++-
 3 files changed, 185 insertions(+), 12 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index b077dd7..13db2d8 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3872,7 +3872,66 @@ Future<Response> Master::Http::_drainAgent(
     const bool markGone,
     const Owned<ObjectApprovers>& approvers) const
 {
-  return NotImplemented();
+  if (!approvers->approved<DRAIN_AGENT>()) {
+    return Forbidden();
+  }
+
+  if (markGone && !approvers->approved<MARK_AGENT_GONE>()) {
+    return Forbidden();
+  }
+
+  // Check that the agent is either recovering, registered, or unreachable.
+  if (!master->slaves.recovered.contains(slaveId) &&
+      !master->slaves.registered.contains(slaveId) &&
+      !master->slaves.unreachable.contains(slaveId)) {
+    return BadRequest("Unknown agent");
+  }
+
+  // If this agent is being marked gone, then no draining can be performed.
+  if (master->slaves.markingGone.contains(slaveId)) {
+    return Conflict("Agent is currently being marked gone");
+  }
+
+  // Save the draining info to the registry.
+  return master->registrar->apply(Owned<RegistryOperation>(
+      new DrainAgent(slaveId, maxGracePeriod, markGone)))
+    .onAny([](const Future<bool>& result) {
+      CHECK_READY(result)
+        << "Failed to update draining info in the registry";
+    })
+    .then(defer(
+        master->self(),
+        [this, slaveId, maxGracePeriod, markGone](bool result) -> Response {
+          // Update the in-memory state.
+          DrainConfig drainConfig;
+          drainConfig.set_mark_gone(markGone);
+
+          if (maxGracePeriod.isSome()) {
+            drainConfig.mutable_max_grace_period()
+              ->CopyFrom(maxGracePeriod.get());
+          }
+
+          DrainInfo drainInfo;
+          drainInfo.set_state(DRAINING);
+          drainInfo.mutable_config()->CopyFrom(drainConfig);
+
+          master->slaves.draining[slaveId] = drainInfo;
+
+          // Deactivate the agent.
+          master->slaves.deactivated.insert(slaveId);
+
+          Slave* slave = master->slaves.registered.get(slaveId);
+          if (slave != nullptr) {
+            master->deactivate(slave);
+
+            // Tell the agent to start draining.
+            DrainSlaveMessage message;
+            message.mutable_config()->CopyFrom(drainConfig);
+            master->send(slave->pid, message);
+          }
+
+          return OK();
+        }));
 }
 
 
@@ -3910,7 +3969,35 @@ Future<Response> Master::Http::_deactivateAgent(
     const SlaveID& slaveId,
     const Owned<ObjectApprovers>& approvers) const
 {
-  return NotImplemented();
+  if (!approvers->approved<DEACTIVATE_AGENT>()) {
+    return Forbidden();
+  }
+
+  // Check that the agent is either recovering, registered, or unreachable.
+  if (!master->slaves.recovered.contains(slaveId) &&
+      !master->slaves.registered.contains(slaveId) &&
+      !master->slaves.unreachable.contains(slaveId)) {
+    return BadRequest("Unknown agent");
+  }
+
+  // Save the deactivation to the registry.
+  return master->registrar->apply(Owned<RegistryOperation>(
+      new DeactivateAgent(slaveId)))
+    .onAny([](const Future<bool>& result) {
+      CHECK_READY(result)
+        << "Failed to deactivate agent in the registry";
+    })
+    .then(defer(master->self(), [this, slaveId](bool result) -> Response {
+      // Deactivate the agent.
+      master->slaves.deactivated.insert(slaveId);
+
+      Slave* slave = master->slaves.registered.get(slaveId);
+      if (slave != nullptr) {
+        master->deactivate(slave);
+      }
+
+      return OK();
+    }));
 }
 
 
@@ -3941,7 +4028,34 @@ Future<Response> Master::Http::_reactivateAgent(
     const SlaveID& slaveId,
     const Owned<ObjectApprovers>& approvers) const
 {
-  return NotImplemented();
+  if (!approvers->approved<REACTIVATE_AGENT>()) {
+    return Forbidden();
+  }
+
+  // Check that the agent is deactivated.
+  if (!master->slaves.deactivated.contains(slaveId)) {
+    return BadRequest("Agent is not deactivated");
+  }
+
+  // Save the reactivation to the registry.
+  return master->registrar->apply(Owned<RegistryOperation>(
+      new ReactivateAgent(slaveId)))
+    .onAny([](const Future<bool>& result) {
+      CHECK_READY(result)
+        << "Failed to reactivate agent in the registry";
+    })
+    .then(defer(master->self(), [this, slaveId](bool result) -> Response {
+      // Reactivate the agent.
+      master->slaves.draining.erase(slaveId);
+      master->slaves.deactivated.erase(slaveId);
+
+      Slave* slave = master->slaves.registered.get(slaveId);
+      if (slave != nullptr) {
+        master->reactivate(slave);
+      }
+
+      return OK();
+    }));
 }
 
 
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2a59f89..19275d5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1695,12 +1695,30 @@ Future<Nothing> Master::_recover(const Registry& registry)
     upgradeResources(&slaveInfo);
 
     slaves.recovered.put(slaveInfo.id(), slaveInfo);
+
+    // Recover the draining and deactivation states.
+    if (slave.has_drain_info()) {
+      slaves.draining[slaveInfo.id()] = slave.drain_info();
+    }
+
+    if (slave.has_deactivated() && slave.deactivated()) {
+      slaves.deactivated.insert(slaveInfo.id());
+    }
   }
 
   foreach (const Registry::UnreachableSlave& unreachable,
            registry.unreachable().slaves()) {
     CHECK(!slaves.unreachable.contains(unreachable.id()));
     slaves.unreachable[unreachable.id()] = unreachable.timestamp();
+
+    // Recover the draining and deactivation states.
+    if (unreachable.has_drain_info()) {
+      slaves.draining[unreachable.id()] = unreachable.drain_info();
+    }
+
+    if (unreachable.has_deactivated() && unreachable.deactivated()) {
+      slaves.deactivated.insert(unreachable.id());
+    }
   }
 
   foreach (const Registry::GoneSlave& gone,
@@ -3427,6 +3445,18 @@ void Master::deactivate(Slave* slave)
 }
 
 
+void Master::reactivate(Slave* slave)
+{
+  CHECK_NOTNULL(slave);
+  CHECK(!slaves.deactivated.contains(slave->id));
+
+  LOG(INFO) << "Reactivating agent " << *slave;
+
+  slave->active = true;
+  allocator->activateSlave(slave->id);
+}
+
+
 void Master::resourceRequest(
     const UPID& from,
     const FrameworkID& frameworkId,
@@ -7155,9 +7185,6 @@ void Master::_registerSlave(
                   "a new agent registered at the same address",
                   metrics->slave_removals_reason_registered);
     } else {
-      CHECK(slave->active)
-        << "Unexpected connected but deactivated agent " << *slave;
-
       LOG(INFO) << "Agent " << *slave << " already registered,"
                 << " resending acknowledgement";
 
@@ -8034,12 +8061,21 @@ void Master::___reregisterSlave(
     slave->connected = true;
     dispatch(slave->observer, &SlaveObserver::reconnect);
 
-    slave->active = true;
-    allocator->activateSlave(slave->id);
+    if (!slaves.deactivated.contains(slave->id)) {
+      reactivate(slave);
+    }
   }
 
-  CHECK(slave->active)
-    << "Unexpected connected but deactivated agent " << *slave;
+  // If this is a draining agent, send it the drain message.
+  // We do this regardless of the draining state (DRAINING or DRAINED),
+  // because the agent is expected to handle the message in either state.
+  if (slaves.draining.contains(slaveInfo.id())) {
+    DrainSlaveMessage message;
+    message.mutable_config()->CopyFrom(
+        slaves.draining.at(slaveInfo.id()).config());
+
+    send(slave->pid, message);
+  }
 
   // Inform the agent of the new framework pids for its tasks, and
   // recover any unknown frameworks from the slave info.
diff --git a/src/master/master.hpp b/src/master/master.hpp
index dda508f..205cdb2 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -213,8 +213,8 @@ Slave(Master* const _master,
   // Slave becomes disconnected when the socket closes.
   bool connected;
 
-  // Slave becomes deactivated when it gets disconnected. In the
-  // future this might also happen via HTTP endpoint.
+  // Slave becomes deactivated when it gets disconnected, or when the
+  // agent is deactivated via the DRAIN_AGENT or DEACTIVATE_AGENT calls.
   // No offers will be made for a deactivated slave.
   bool active;
 
@@ -675,8 +675,15 @@ protected:
   void deactivate(Framework* framework, bool rescind);
 
   void disconnect(Slave* slave);
+
+  // Removes the agent from the resource offer cycle (and rescinds active
+  // offers). Other aspects of the agent will continue to function normally.
   void deactivate(Slave* slave);
 
+  // Adds the agent back to the resource offer cycle.
+  // Must *NOT* be called if the agent is `deactivated`.
+  void reactivate(Slave* slave);
+
   // Add a slave.
   void addSlave(
       Slave* slave,
@@ -2132,6 +2139,22 @@ private:
     // Slaves that are in the process of being marked gone.
     hashset<SlaveID> markingGone;
 
+    // Agents which have been marked for draining, including recovered,
+    // admitted, and unreachable agents. All draining agents will also
+    // be deactivated. If an agent in this set reregisters, the master
+    // will send it a `DrainSlaveMessage`.
+    //
+    // These values are checkpointed to the registry.
+    hashmap<SlaveID, DrainInfo> draining;
+
+    // Agents which have been deactivated, including recovered, admitted,
+    // and unreachable agents. Agents in this set will not have resource
+    // offers generated and will thus be unable to launch new operations,
+    // but existing operations will be unaffected.
+    //
+    // These values are checkpointed to the registry.
+    hashset<SlaveID> deactivated;
+
     // This collection includes agents that have gracefully shutdown,
     // as well as those that have been marked unreachable or gone. We
     // keep a cache here to prevent this from growing in an unbounded