You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/05/27 01:19:53 UTC

[4/9] mesos git commit: Introduced executor reconnect retries on the agent.

Introduced executor reconnect retries on the agent.

PID-based v0 executors using Mesos libraries >= 1.1.2 always re-link
with the agent upon receiving the reconnect message. This avoids the
executor replying on a half-open TCP connection to the old agent
(possible if netfilter is dropping packets, see: MESOS-7057).
However, PID-based executors using Mesos libraries < 1.1.2 do not
re-link and are therefore prone to replying on a half-open connection
after the agent restarts. If we only send a single reconnect message,
these "old" executors will reply on their half-open connection,
receive a RST, and think the agent just died. To ensure these "old"
executors can reconnect in the presence of netfilter dropping packets,
we introduced optional retries of the reconnect message. This results
in "old" executors correctly establishing a link when processing the
second reconnect message.

Generally, users should not enable this flag unless they are affected
by this issue.

Review: https://reviews.apache.org/r/59584/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6e10737b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6e10737b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6e10737b

Branch: refs/heads/1.3.x
Commit: 6e10737b82de4f936e00273661d509439e13549b
Parents: ae18261
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri May 26 10:17:58 2017 -0400
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri May 26 17:52:53 2017 -0700

----------------------------------------------------------------------
 src/slave/flags.cpp | 22 +++++++++++
 src/slave/flags.hpp |  1 +
 src/slave/slave.cpp | 99 +++++++++++++++++++++++++++++++++++++++++++++---
 3 files changed, 117 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6e10737b/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index ba8d428..36e8ac6 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -348,6 +348,28 @@ mesos::internal::slave::Flags::Flags()
         return None();
       });
 
+  // TODO(bmahler): Remove this once v0 executors are no longer supported.
+  add(&Flags::executor_reregistration_retry_interval,
+      "executor_reregistration_retry_interval",
+      "For PID-based executors, how long the agent waits before retrying\n"
+      "the reconnect message sent to the executor during recovery.\n"
+      "NOTE: Do not use this unless you understand the following\n"
+      "(see MESOS-5332): PID-based executors using Mesos libraries >= 1.1.2\n"
+      "always re-link with the agent upon receiving the reconnect message.\n"
+      "This avoids the executor replying on a half-open TCP connection to\n"
+      "the old agent (possible if netfilter is dropping packets,\n"
+      "see: MESOS-7057). However, PID-based executors using Mesos\n"
+      "libraries < 1.1.2 do not re-link and are therefore prone to\n"
+      "replying on a half-open connection after the agent restarts. If we\n"
+      "only send a single reconnect message, these \"old\" executors will\n"
+      "reply on their half-open connection and receive a RST; without any\n"
+      "retries, they will fail to reconnect and be killed by the agent once\n"
+      "the executor re-registration timeout elapses. To ensure these \"old\"\n"
+      "executors can reconnect in the presence of netfilter dropping\n"
+      "packets, we introduced optional retries of the reconnect message.\n"
+      "This results in \"old\" executors correctly establishing a link\n"
+      "when processing the second reconnect message.");
+
   add(&Flags::executor_shutdown_grace_period,
       "executor_shutdown_grace_period",
       "Default amount of time to wait for an executor to shut down\n"

http://git-wip-us.apache.org/repos/asf/mesos/blob/6e10737b/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 32466e0..e9f0ddc 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -78,6 +78,7 @@ public:
   Option<JSON::Object> executor_environment_variables;
   Duration executor_registration_timeout;
   Duration executor_reregistration_timeout;
+  Option<Duration> executor_reregistration_retry_interval;
   Duration executor_shutdown_grace_period;
 #ifdef USE_SSL_SOCKET
   Option<Path> executor_secret_key;

http://git-wip-us.apache.org/repos/asf/mesos/blob/6e10737b/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a850713..5ebbb87 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -35,6 +35,7 @@
 
 #include <mesos/module/authenticatee.hpp>
 
+#include <process/after.hpp>
 #include <process/async.hpp>
 #include <process/check.hpp>
 #include <process/collect.hpp>
@@ -43,6 +44,7 @@
 #include <process/dispatch.hpp>
 #include <process/http.hpp>
 #include <process/id.hpp>
+#include <process/loop.hpp>
 #include <process/reap.hpp>
 #include <process/time.hpp>
 
@@ -130,9 +132,13 @@ using std::set;
 using std::string;
 using std::vector;
 
+using process::after;
 using process::async;
 using process::wait; // Necessary on some OS's to disambiguate.
+using process::Break;
 using process::Clock;
+using process::Continue;
+using process::ControlFlow;
 using process::Failure;
 using process::Future;
 using process::Owned;
@@ -3963,10 +3969,11 @@ void Slave::reregisterExecutor(
         state == RUNNING || state == TERMINATING)
     << state;
 
-  if (state != RECOVERING) {
-    LOG(WARNING) << "Shutting down executor '" << executorId
-                 << "' of framework " << frameworkId
-                 << " because the agent is not in recovery mode";
+  if (state == TERMINATING) {
+    LOG(WARNING) << "Shutting down executor '" << executorId << "'"
+                 << " of framework " << frameworkId
+                 << " because the agent is terminating";
+
     reply(ShutdownExecutorMessage());
     return;
   }
@@ -4000,11 +4007,37 @@ void Slave::reregisterExecutor(
     case Executor::TERMINATED:
       // TERMINATED is possible if the executor forks, the parent process
       // terminates and the child process (driver) tries to register!
-    case Executor::RUNNING:
       LOG(WARNING) << "Shutting down executor " << *executor
                    << " because it is in unexpected state " << executor->state;
       reply(ShutdownExecutorMessage());
       break;
+
+    case Executor::RUNNING:
+      if (flags.executor_reregistration_retry_interval.isNone()) {
+        // Previously, when an executor sends a re-registration while
+        // in the RUNNING state, we would shut the executor down. We
+        // preserve that behavior when the optional reconnect retry
+        // is not enabled.
+        LOG(WARNING) << "Shutting down executor " << *executor
+                     << " because it is in unexpected state "
+                     << executor->state;
+        reply(ShutdownExecutorMessage());
+      } else {
+        // When the agent is configured to retry the reconnect requests
+        // to executors, we ignore any further re-registrations. This
+        // is because we can't easily handle re-registering libprocess
+        // based executors in the steady state, and we plan to move to
+        // only allowing v1 HTTP executors (where re-subscription in
+        // the steady state is supported). Also, ignoring this message
+        // ensures that any executors mimicking the libprocess protocol
+        // do not have any illusion of being able to re-register without
+        // an agent restart (hopefully they will commit suicide if they
+        // fail to re-register).
+        LOG(WARNING) << "Ignoring executor re-registration message from "
+                     << *executor << " because it is already registered";
+      }
+      break;
+
     case Executor::REGISTERING: {
       executor->state = Executor::RUNNING;
 
@@ -5915,6 +5948,62 @@ Future<Nothing> Slave::_recover()
           ReconnectExecutorMessage message;
           message.mutable_slave_id()->MergeFrom(info.id());
           send(executor->pid.get(), message);
+
+          // PID-based executors using Mesos libraries >= 1.1.2 always
+          // re-link with the agent upon receiving the reconnect message.
+          // This avoids the executor replying on a half-open TCP
+          // connection to the old agent (possible if netfilter is
+          // dropping packets, see: MESOS-7057). However, PID-based
+          // executors using Mesos libraries < 1.1.2 do not re-link
+          // and are therefore prone to replying on a half-open connection
+          // after the agent restarts. If we only send a single reconnect
+          // message, these "old" executors will reply on their half-open
+          // connection and receive a RST; without any retries, they will
+          // fail to reconnect and be killed by the agent once the executor
+          // re-registration timeout elapses. To ensure these "old"
+          // executors can reconnect in the presence of netfilter dropping
+          // packets, we introduced optional retries of the reconnect
+          // message. This results in "old" executors correctly establishing
+          // a link when processing the second reconnect message.
+          if (flags.executor_reregistration_retry_interval.isSome()) {
+            const Duration& retryInterval =
+              flags.executor_reregistration_retry_interval.get();
+
+            const FrameworkID& frameworkId = framework->id();
+            const ExecutorID& executorId = executor->id;
+
+            process::loop(
+                self(),
+                [retryInterval]() {
+                  return after(retryInterval);
+                },
+                [this, frameworkId, executorId, message](Nothing)
+                    -> ControlFlow<Nothing> {
+                  if (state != RECOVERING) {
+                    return Break();
+                  }
+
+                  Framework* framework = getFramework(frameworkId);
+                  if (framework == nullptr) {
+                    return Break();
+                  }
+
+                  Executor* executor = framework->getExecutor(executorId);
+                  if (executor == nullptr) {
+                    return Break();
+                  }
+
+                  if (executor->state != Executor::REGISTERING) {
+                    return Break();
+                  }
+
+                  LOG(INFO) << "Re-sending reconnect request to executor "
+                            << *executor;
+
+                  send(executor->pid.get(), message);
+                  return Continue();
+                });
+          }
         } else if (executor->pid.isNone()) {
           LOG(INFO) << "Waiting for executor " << *executor
                     << " to subscribe";