You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2018/01/10 23:03:45 UTC

[2/2] mesos git commit: Unified the marking agent unreachable logic in the master.

Unified the marking agent unreachable logic in the master.

The logic for marking an agent unreachable in the master had two
very similar code paths that differed slightly across failover
and steady state cases. This patch uses a single code path.

Unfortunately, some slight differences were necessary, and a
failover boolean was introduced to accomodate them. Specifically,
the failover and steady state cases expect the agent to reside
in the recovered and registered lists, respectively.

Review: https://reviews.apache.org/r/64930


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/74b2a0b7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/74b2a0b7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/74b2a0b7

Branch: refs/heads/master
Commit: 74b2a0b7125d6ac5fc8fdc535f4cf57e67cbf061
Parents: 7116011
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Jan 3 14:08:36 2018 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Jan 10 14:44:20 2018 -0800

----------------------------------------------------------------------
 src/master/master.cpp     | 305 +++++++++++++++++------------------------
 src/master/master.hpp     |  31 +++--
 src/tests/slave_tests.cpp |   7 +-
 3 files changed, 150 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/74b2a0b7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6fc5de8..daf2f7e 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -271,7 +271,8 @@ protected:
 
       dispatch(master,
                &Master::markUnreachable,
-               slaveId,
+               slaveInfo,
+               false,
                "health check timed out");
     } else if (future.isDiscarded()) {
       LOG(INFO) << "Canceling transition of agent " << slaveId
@@ -1405,7 +1406,8 @@ Nothing Master::_agentReregisterTimeout(const SlaveID& slaveId)
   ++metrics->slave_unreachable_completed;
 
   markUnreachable(
-      slaveId,
+      slave->info,
+      false,
       "agent did not re-register within " +
       stringify(flags.agent_reregister_timeout) +
       " after disconnecting");
@@ -2006,124 +2008,35 @@ void Master::recoveredSlavesTimeout(const Registry& registry)
     const string failure = "Agent removal rate limit acquisition failed";
 
     // TODO(bmahler): Cancelation currently occurs within by returning
-    // early from `markUnreachableAfterFailover` *without* the
-    // "discarder" having discarded the rate limit token. This approach
-    // means that if agents re-register while many of the marking
-    // unreachable operations are in progress, the rate that we mark
-    // unreachable will "slow down" rather than stay constant. We
-    // should instead discard the rate limit token when the agent
-    // re-registers and handle the discard here. See MESOS-8386.
+    // early from `markUnreachable` *without* the "discarder" having
+    // discarded the rate limit token. This approach means that if
+    // agents re-register while many of the marking unreachable
+    // operations are in progress, the rate that we mark unreachable
+    // will "slow down" rather than stay constant. We should instead
+    // discard the rate limit token when the agent re-registers and
+    // handle the discard here. See MESOS-8386.
     acquire
-      .then(defer(self(), &Self::markUnreachableAfterFailover, slave.info()))
       .onFailed(lambda::bind(fail, failure, lambda::_1))
-      .onDiscarded(lambda::bind(fail, failure, "discarded"));
-
-    ++metrics->slave_unreachable_scheduled;
-  }
-}
-
-
-Nothing Master::markUnreachableAfterFailover(const SlaveInfo& slave)
-{
-  // The slave might have reregistered while we were waiting to
-  // acquire the rate limit.
-  if (!slaves.recovered.contains(slave.id())) {
-    LOG(INFO) << "Canceling transition of agent "
-              << slave.id() << " (" << slave.hostname() << ")"
-              << " to unreachable because it re-registered";
-
-    ++metrics->slave_unreachable_canceled;
-    return Nothing();
-  }
-
-  // The slave might be in the process of reregistering.
-  if (slaves.reregistering.contains(slave.id())) {
-    LOG(INFO) << "Canceling transition of agent "
-              << slave.id() << " (" << slave.hostname() << ")"
-              << " to unreachable because it is re-registering";
-
-    ++metrics->slave_unreachable_canceled;
-    return Nothing();
-  }
-
-  if (slaves.markingGone.contains(slave.id())) {
-    LOG(INFO) << "Canceling transition of agent "
-              << slave.id() << " (" << slave.hostname() << ")"
-              << " to unreachable because an agent gone"
-              << " operation is in progress";
-
-    ++metrics->slave_unreachable_canceled;
-    return Nothing();
-  }
-
-  if (slaves.gone.contains(slave.id())) {
-    LOG(INFO) << "Canceling transition of agent "
-              << slave.id() << " (" << slave.hostname() << ")"
-              << " to unreachable because the agent has"
-              << " been marked gone";
-
-    ++metrics->slave_unreachable_canceled;
-    return Nothing();
-  }
-
-  LOG(WARNING) << "Agent " << slave.id()
-               << " (" << slave.hostname() << ") did not re-register"
-               << " within " << flags.agent_reregister_timeout
-               << " after master failover; marking it unreachable";
-
-  ++metrics->slave_unreachable_completed;
-
-  TimeInfo unreachableTime = protobuf::getCurrentTime();
-
-  slaves.markingUnreachable.insert(slave.id());
-
-  registrar->apply(Owned<RegistryOperation>(
-          new MarkSlaveUnreachable(slave, unreachableTime)))
-    .onAny(defer(self(),
-                 &Self::_markUnreachableAfterFailover,
-                 slave,
-                 unreachableTime,
-                 lambda::_1));
-
-  return Nothing();
-}
-
-
-void Master::_markUnreachableAfterFailover(
-    const SlaveInfo& slaveInfo,
-    const TimeInfo& unreachableTime,
-    const Future<bool>& registrarResult)
-{
-  CHECK(slaves.markingUnreachable.contains(slaveInfo.id()));
-  slaves.markingUnreachable.erase(slaveInfo.id());
+      .onDiscarded(lambda::bind(fail, failure, "discarded"))
+      .then(defer(self(),
+                  &Self::markUnreachable,
+                  slave.info(),
+                  true,
+                  "did not re-register within"
+                  " " + stringify(flags.agent_reregister_timeout) +
+                  " after master failover"))
+      .then(defer(self(), [=](bool marked) {
+        if (marked) {
+          ++metrics->slave_unreachable_completed;
+        } else {
+          ++metrics->slave_unreachable_canceled;
+        }
 
-  CHECK(slaves.recovered.contains(slaveInfo.id()));
-  slaves.recovered.erase(slaveInfo.id());
+        return Nothing();
+      }));
 
-  if (registrarResult.isFailed()) {
-    LOG(FATAL) << "Failed to mark agent " << slaveInfo.id()
-               << " (" << slaveInfo.hostname() << ")"
-               << " unreachable in the registry: "
-               << registrarResult.failure();
+    ++metrics->slave_unreachable_scheduled;
   }
-
-  CHECK(!registrarResult.isDiscarded());
-
-  // `MarkSlaveUnreachable` registry operation should never fail.
-  CHECK(registrarResult.get());
-
-  LOG(INFO) << "Marked agent " << slaveInfo.id() << " ("
-            << slaveInfo.hostname() << ") unreachable: "
-            << "did not re-register after master failover";
-
-  ++metrics->slave_removals;
-  ++metrics->slave_removals_reason_unhealthy;
-  ++metrics->recovery_slave_removals;
-
-  CHECK(!slaves.unreachable.contains(slaveInfo.id()));
-  slaves.unreachable[slaveInfo.id()] = unreachableTime;
-
-  sendSlaveLost(slaveInfo);
 }
 
 
@@ -8193,23 +8106,40 @@ void Master::shutdown(
 }
 
 
-// TODO(neilc): Refactor to reduce code duplication with
-// `Master::removeSlave`.
-void Master::markUnreachable(const SlaveID& slaveId, const string& message)
+Future<bool> Master::markUnreachable(
+    const SlaveInfo& slave,
+    bool duringMasterFailover,
+    const string& message)
 {
-  Slave* slave = slaves.registered.get(slaveId);
+  if (duringMasterFailover && !slaves.recovered.contains(slave.id())) {
+    LOG(INFO) << "Skipping transition of agent"
+              << " " << slave.id() << " (" << slave.hostname() << ")"
+              << " to unreachable because it re-registered in the interim";
+    return false;
+  }
 
-  if (slave == nullptr) {
-    // Possible when the `SlaveObserver` dispatches a message to mark an
-    // unhealthy slave as unreachable, but the slave is concurrently
-    // removed for another reason (e.g., `UnregisterSlaveMessage` is
-    // received).
-    LOG(WARNING) << "Unable to mark unknown agent "
-                 << slaveId << " unreachable";
-    return;
+  if (!duringMasterFailover && !slaves.registered.contains(slave.id())) {
+    // Possible when the `SlaveObserver` dispatches a message to
+    // mark an unhealthy slave as unreachable, but the slave is
+    // concurrently removed for another reason (e.g.,
+    // `UnregisterSlaveMessage` is received).
+    LOG(WARNING) << "Skipping transition of agent"
+                 << " " << slave.id() << " (" << slave.hostname() << ")"
+                 << " to unreachable because it has already been removed"
+                 << " or marked unreachable";
+    return false;
   }
 
-  if (slaves.markingUnreachable.contains(slaveId)) {
+  // The slave might be in the process of reregistering without
+  // the marking unreachable having been canceled.
+  if (slaves.reregistering.contains(slave.id())) {
+    LOG(INFO) << "Skipping transition of agent"
+              << " " << slave.id() << " (" << slave.hostname() << ")"
+              << " to unreachable because it is re-registering";
+    return false;
+  }
+
+  if (slaves.markingUnreachable.contains(slave.id())) {
     // We might already be marking this slave unreachable. This is
     // possible if marking the slave unreachable in the registry takes
     // a long time. While the registry operation is in progress, the
@@ -8218,89 +8148,106 @@ void Master::markUnreachable(const SlaveID& slaveId, const string& message)
     // another attempt to mark it unreachable. Also possible if
     // `agentReregisterTimeout` marks the slave unreachable
     // concurrently with the slave observer doing so.
-    LOG(WARNING) << "Not marking agent " << slaveId
-                 << " unreachable because another unreachable"
+    LOG(WARNING) << "Skipping transition of agent"
+                 << " " << slave.id() << " (" << slave.hostname() << ")"
+                 << " to unreachable because another unreachable"
                  << " transition is already in progress";
-    return;
+    return false;
   }
 
-  if (slaves.removing.contains(slaveId)) {
-    LOG(WARNING) << "Not marking agent " << slaveId
-                 << " unreachable because it is unregistering";
-    return;
+  if (slaves.removing.contains(slave.id())) {
+    LOG(WARNING) << "Skipping transition of agent"
+                 << " " << slave.id() << " (" << slave.hostname() << ")"
+                 << " to unreachable because it is being removed";
+    return false;
   }
 
-  if (slaves.markingGone.contains(slaveId)) {
-    LOG(INFO) << "Canceling transition of agent " << slaveId
-              << " to unreachable because an agent gone"
-              << " operation is in progress";
-    return;
+  if (slaves.removed.get(slave.id()).isSome()) {
+    LOG(WARNING) << "Skipping transition of agent"
+                 << " " << slave.id() << " (" << slave.hostname() << ")"
+                 << " to unreachable because it has been removed";
+    return false;
   }
 
-  if (slaves.gone.contains(slaveId)) {
-    LOG(INFO) << "Canceling transition of agent " << slaveId
-              << " to unreachable because the agent has"
-              << " been marked gone";
-    return;
+  if (slaves.markingGone.contains(slave.id())) {
+    LOG(WARNING) << "Skipping transition of agent"
+                 << " " << slave.id() << " (" << slave.hostname() << ")"
+                 << " to unreachable because it is being marked as gone";
+    return false;
   }
 
-  LOG(INFO) << "Marking agent " << *slave
-            << " unreachable: " << message;
+  if (slaves.gone.contains(slave.id())) {
+    LOG(WARNING) << "Skipping transition of agent"
+                 << " " << slave.id() << " (" << slave.hostname() << ")"
+                 << " to unreachable because it has been marked as gone";
+    return false;
+  }
 
-  CHECK(!slaves.unreachable.contains(slaveId));
-  CHECK(slaves.removed.get(slaveId).isNone());
+  LOG(INFO) << "Marking agent " << slave.id() << " (" << slave.hostname() << ")"
+            << " unreachable: " << message;
 
-  slaves.markingUnreachable.insert(slave->id);
+  CHECK(!slaves.unreachable.contains(slave.id()));
+  slaves.markingUnreachable.insert(slave.id());
 
-  // Use the same timestamp for all status updates sent below; we also
-  // use this timestamp when updating the registry.
+  // Use the same timestamp for all status updates sent below;
+  // we also use this timestamp when updating the registry.
   TimeInfo unreachableTime = protobuf::getCurrentTime();
 
+  const string failure = "Failed to mark agent " + stringify(slave.id()) +
+    " (" + slave.hostname() + ") as unreachable in the registry";
+
   // Update the registry to move this slave from the list of admitted
   // slaves to the list of unreachable slaves. After this is complete,
   // we can remove the slave from the master's in-memory state and
   // send TASK_UNREACHABLE / TASK_LOST updates to the frameworks.
-  registrar->apply(Owned<RegistryOperation>(
-          new MarkSlaveUnreachable(slave->info, unreachableTime)))
-    .onAny(defer(self(),
-                 &Self::_markUnreachable,
-                 slave,
-                 unreachableTime,
-                 message,
-                 lambda::_1));
+  return undiscardable(
+      registrar->apply(Owned<RegistryOperation>(
+          new MarkSlaveUnreachable(slave, unreachableTime)))
+      .onFailed(lambda::bind(fail, failure, lambda::_1))
+      .onDiscarded(lambda::bind(fail, failure, "discarded"))
+      .then(defer(self(), [=](bool result) {
+        _markUnreachable(
+            slave, unreachableTime, duringMasterFailover, message, result);
+        return true;
+      })));
 }
 
 
 void Master::_markUnreachable(
-    Slave* slave,
+    const SlaveInfo& slave,
     const TimeInfo& unreachableTime,
+    bool duringMasterFailover,
     const string& message,
-    const Future<bool>& registrarResult)
+    bool registrarResult)
 {
-  CHECK_NOTNULL(slave);
-  CHECK(slaves.markingUnreachable.contains(slave->info.id()));
-  slaves.markingUnreachable.erase(slave->info.id());
-
-  if (registrarResult.isFailed()) {
-    LOG(FATAL) << "Failed to mark agent " << *slave
-               << " unreachable in the registry: "
-               << registrarResult.failure();
-  }
-
-  CHECK(!registrarResult.isDiscarded());
-
   // `MarkSlaveUnreachable` registry operation should never fail.
-  CHECK(registrarResult.get());
+  CHECK(registrarResult);
+
+  CHECK(slaves.markingUnreachable.contains(slave.id()));
+  slaves.markingUnreachable.erase(slave.id());
 
-  LOG(INFO) << "Marked agent " << *slave << " unreachable: " << message;
+  LOG(INFO) << "Marked agent"
+            << " " << slave.id() << " (" << slave.hostname() << ")"
+            << " unreachable: " << message;
 
   ++metrics->slave_removals;
   ++metrics->slave_removals_reason_unhealthy;
 
-  CHECK(!slaves.unreachable.contains(slave->id));
-  slaves.unreachable[slave->id] = unreachableTime;
+  CHECK(!slaves.unreachable.contains(slave.id()));
+  slaves.unreachable[slave.id()] = unreachableTime;
+
+  if (duringMasterFailover) {
+    CHECK(slaves.recovered.contains(slave.id()));
+    slaves.recovered.erase(slave.id());
+
+    ++metrics->recovery_slave_removals;
 
-  __removeSlave(slave, message, unreachableTime);
+    sendSlaveLost(slave);
+  } else {
+    CHECK(slaves.registered.contains(slave.id()));
+
+    __removeSlave(slaves.registered.get(slave.id()), message, unreachableTime);
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/74b2a0b7/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5e6ba53..cdfd06c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -506,8 +506,21 @@ public:
       const MachineID& machineId,
       const Option<Unavailability>& unavailability);
 
-  void markUnreachable(
-      const SlaveID& slaveId,
+  // Marks the agent unreachable and returns whether the agent was
+  // marked unreachable. Returns false if the agent is already
+  // in a transitioning state or has transitioned into another
+  // state (this includes already being marked unreachable).
+  // The `duringMasterFailover` parameter specifies whether this
+  // agent is transitioning from a recovered state (true) or a
+  // registered state (false).
+  //
+  // Discarding currently not supported.
+  //
+  // Will not return a failure (this will crash the master
+  // internally in the case of a registry failure).
+  process::Future<bool> markUnreachable(
+      const SlaveInfo& slave,
+      bool duringMasterFailover,
       const std::string& message);
 
   void markGone(Slave* slave, const TimeInfo& goneTime);
@@ -707,19 +720,11 @@ protected:
       std::vector<Archive::Framework>&& completedFrameworks);
 
   void _markUnreachable(
-      Slave* slave,
+      const SlaveInfo& slave,
       const TimeInfo& unreachableTime,
+      bool duringMasterFailover,
       const std::string& message,
-      const process::Future<bool>& registrarResult);
-
-  // Mark a slave as unreachable in the registry. Called when the slave
-  // does not re-register in time after a master failover.
-  Nothing markUnreachableAfterFailover(const SlaveInfo& slave);
-
-  void _markUnreachableAfterFailover(
-      const SlaveInfo& slaveInfo,
-      const TimeInfo& unreachableTime,
-      const process::Future<bool>& registrarResult);
+      bool registrarResult);
 
   void sendSlaveLost(const SlaveInfo& slaveInfo);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/74b2a0b7/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 1591920..288a419 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -3644,9 +3644,14 @@ TEST_F(SlaveTest, HealthCheckUnregisterRace)
   EXPECT_CALL(*master.get()->registrar, apply(_))
     .Times(0);
 
+  SlaveInfo slaveInfo;
+  slaveInfo.mutable_id()->CopyFrom(slaveId);
+  slaveInfo.set_hostname("hostname");
+
   process::dispatch(master.get()->pid,
                     &Master::markUnreachable,
-                    slaveId,
+                    slaveInfo,
+                    false,
                     "dummy test case dispatch");
 
   Clock::settle();