You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/03/13 01:32:02 UTC

[3/3] mesos git commit: Made changes to the executor library around managing connections.

Made changes to the executor library around managing connections.

This change makes the following modifications to the library:

\- Removes passing connection objects to \`defer\` callbacks as it can
sometimes lead to deadlocks around destruction in the same execution
context.
\- Introduced 3 additional states \`CONNECTING\`, \`SUBSCRIBING\` and
\`SUBSCRIBED\`. The \`CONNECTING\` state helps us in identifying if a
connection attempt is in progress while the latter two states allows
us to drop subscribe calls if one is already is in progress.
\- Creates a random \`connectionID\` to demarcate a new connection
instance and  allowing to discard a state connection attempt.
\- Changes around setting the recovery timeout timer only once.
This allows us to later discard the recoveryTimeout callback
if we connected with the agent at a later point of time.

Review: https://reviews.apache.org/r/44580/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/295b5802
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/295b5802
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/295b5802

Branch: refs/heads/master
Commit: 295b58022523c636d4331af760e5d3ba6f8e4703
Parents: 441c278
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Sat Mar 12 19:31:33 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Mar 12 19:31:33 2016 -0500

----------------------------------------------------------------------
 src/executor/executor.cpp | 263 +++++++++++++++++++++++++++++------------
 1 file changed, 187 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/295b5802/src/executor/executor.cpp
----------------------------------------------------------------------
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index 5d00b11..0050e22 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -21,6 +21,7 @@
 #include <mesos/v1/mesos.hpp>
 
 #include <process/async.hpp>
+#include <process/clock.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
@@ -30,12 +31,14 @@
 #include <process/owned.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
+#include <process/timer.hpp>
 
 #include <stout/duration.hpp>
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
+#include <stout/uuid.hpp>
 
 #include "common/http.hpp"
 #include "common/recordio.hpp"
@@ -52,6 +55,7 @@
 using namespace mesos;
 using namespace mesos::internal;
 
+using std::ostream;
 using std::queue;
 using std::string;
 
@@ -60,11 +64,13 @@ using mesos::internal::recordio::Reader;
 using mesos::internal::slave::validation::executor::call::validate;
 
 using process::async;
+using process::Clock;
 using process::delay;
 using process::dispatch;
 using process::Future;
 using process::Mutex;
 using process::Owned;
+using process::Timer;
 
 using process::ID::generate;
 
@@ -245,17 +251,26 @@ public:
 
   void send(const Call& call)
   {
-    if (state == DISCONNECTED) {
-      drop(call, "Disconnected");
-      return;
-    }
-
     Option<Error> error = validate(devolve(call));
     if (error.isSome()) {
       drop(call, error->message);
       return;
     }
 
+    if (call.type() == Call::SUBSCRIBE && state != CONNECTED) {
+      // It might be possible that the executor is retrying. We drop the
+      // request if we have an ongoing subscribe request in flight or if the
+      // executor is already subscribed.
+      drop(call, "Executor is in state " + stringify(state));
+      return;
+    }
+
+    if (call.type() != Call::SUBSCRIBE && state != SUBSCRIBED) {
+      // We drop all non-subscribe calls if we are not currently subscribed.
+      drop(call, "Executor is in state " + stringify(state));
+      return;
+    }
+
     VLOG(1) << "Sending " << call.type() << " call to " << agent;
 
     ::Request request;
@@ -270,18 +285,25 @@ public:
 
     Future<Response> response;
     if (call.type() == Call::SUBSCRIBE) {
+      state = SUBSCRIBING;
+
       // Send a streaming request for Subscribe call.
       response = connections->subscribe.send(request, true);
     } else {
       response = connections->nonSubscribe.send(request);
     }
 
-    response.onAny(defer(self(), &Self::_send, call, lambda::_1));
+    CHECK_SOME(connectionId);
+    response.onAny(defer(self(),
+                         &Self::_send,
+                         connectionId.get(),
+                         call,
+                         lambda::_1));
   }
 
   ~MesosProcess()
   {
-    close();
+    disconnect();
   }
 
 protected:
@@ -292,25 +314,48 @@ protected:
 
   void connect()
   {
-    if (state == CONNECTED) {
-      return;
-    }
+    CHECK(state == DISCONNECTED || state == CONNECTING) << state;
+
+    connectionId = UUID::random();
+
+    state = CONNECTING;
+
+    // This automatic variable is needed for lambda capture. We need to
+    // create a copy here because `connectionId` might change by the time the
+    // second `http::connect()` gets called.
+    UUID connectionId_ = connectionId.get();
 
     // We create two persistent connections here, one for subscribe
     // call/streaming response and another for non-subscribe calls/responses.
     process::http::connect(agent)
-      .onAny(defer(self(), [this](const Future<Connection>& connection) {
+      .onAny(defer(self(), [this, connectionId_](
+                               const Future<Connection>& connection) {
         process::http::connect(agent)
-          .onAny(defer(self(), &Self::connected, connection, lambda::_1));
+          .onAny(defer(self(),
+                       &Self::connected,
+                       connectionId_,
+                       connection,
+                       lambda::_1));
       }));
   }
 
   void connected(
+      const UUID& _connectionId,
       const Future<Connection>& connection1,
       const Future<Connection>& connection2)
   {
+    // It is possible that the agent process failed while we have an ongoing
+    // (re-)connection attempt with the agent.
+    if (connectionId != _connectionId) {
+      VLOG(1) << "Ignoring connection attempt from stale connection";
+      return;
+    }
+
+    CHECK_EQ(CONNECTING, state);
+    CHECK_SOME(connectionId);
+
     if (!connection1.isReady()) {
-      disconnected(connections,
+      disconnected(connectionId.get(),
                    connection1.isFailed()
                      ? connection1.failure()
                      : "Subscribe future discarded");
@@ -318,7 +363,7 @@ protected:
     }
 
     if (!connection2.isReady()) {
-      disconnected(connections,
+      disconnected(connectionId.get(),
                    connection2.isFailed()
                      ? connection2.failure()
                      : "Non-subscribe future discarded");
@@ -334,15 +379,25 @@ protected:
     connections->subscribe.disconnected()
       .onAny(defer(self(),
                    &Self::disconnected,
-                   connections,
+                   connectionId.get(),
                    "Subscribe connection interrupted"));
 
     connections->nonSubscribe.disconnected()
       .onAny(defer(self(),
                    &Self::disconnected,
-                   connections,
+                   connectionId.get(),
                    "Non-subscribe connection interrupted"));
 
+    // Cancel the recovery timer if we connected after a disconnection with the
+    // agent when framework checkpointing is enabled. This ensures that we have
+    // only one active timer instance at a given point of time.
+    if (recoveryTimer.isSome()) {
+      CHECK(checkpoint);
+
+      Clock::cancel(recoveryTimer.get());
+      recoveryTimer = None();
+    }
+
     // Invoke the connected callback once we have established both subscribe
     // and non-subscribe connections with the agent.
     mutex.lock()
@@ -353,52 +408,55 @@ protected:
   }
 
   void disconnected(
-      const Option<Connections>& _connections,
+      const UUID& _connectionId,
       const string& failure)
   {
-    // Ignore if we are already disconnected from the agent.
-    if (state == DISCONNECTED) {
-      return;
-    }
-
     // Ignore if the disconnection happened from an old stale connection.
-    if (connections != _connections) {
+    if (connectionId != _connectionId) {
+      VLOG(1) << "Ignoring disconnection attempt from stale connection";
       return;
     }
 
+    CHECK_NE(DISCONNECTED, state);
+
     VLOG(1) << "Disconnected from agent: " << failure;
 
-    state = DISCONNECTED;
+    bool connected =
+      (state == CONNECTED || state == SUBSCRIBING || state == SUBSCRIBED);
 
-    // NOTE: We will be here if either subscribe or non-subscribe connection is
-    // disconnected. We explicitly disconnect both the connections here for
-    // simplicity.
-    CHECK_SOME(connections);
+    if (connected) {
+      // Invoke the disconnected callback the first time we disconnect from
+      // the agent.
+      mutex.lock()
+        .then(defer(self(), [this]() {
+          return async(callbacks.disconnected);
+        }))
+        .onAny(lambda::bind(&Mutex::unlock, mutex));
+    }
 
-    connections->subscribe.disconnect();
-    connections->nonSubscribe.disconnect();
+    // Disconnect any active connections.
+    disconnect();
 
-    // Close the old subscribed response stream.
-    close();
+    // This represents a disconnection due to a backoff attempt after being
+    // already disconnected from the agent. We had already started the
+    // recovery timer when we initially noticed the disconnection.
+    if (recoveryTimer.isSome()) {
+      CHECK(checkpoint);
 
-    // Invoke the disconnected callback the first time we disconnect from
-    // the agent.
-    mutex.lock()
-      .then(defer(self(), [this]() {
-        return async(callbacks.disconnected);
-      }))
-      .onAny(lambda::bind(&Mutex::unlock, mutex));
+      return;
+    }
 
-    if (checkpoint) {
+    if (checkpoint && connected) {
       CHECK_SOME(recoveryTimeout);
+      CHECK_NONE(recoveryTimer);
 
-      CHECK_SOME(connections);
-
-      // Set up recovery timeout upon disconnection.
-      delay(recoveryTimeout.get(),
-            self(),
-            &Self::_recoveryTimeout,
-            connections.get());
+      // Set up the recovery timeout upon disconnection. We only set it once per
+      // disconnection. This ensures that when we try to (re-)connect with
+      // the agent and are unsuccessful, we don't restart the recovery timer.
+      recoveryTimer = delay(
+          recoveryTimeout.get(),
+          self(),
+          &Self::_recoveryTimeout);
 
       // Backoff and reconnect only if framework checkpointing is enabled.
       backoff();
@@ -409,10 +467,12 @@ protected:
 
   void backoff()
   {
-    if (state == CONNECTED) {
+    if (state == CONNECTED || state == SUBSCRIBING || state == SUBSCRIBED) {
       return;
     }
 
+    CHECK(state == DISCONNECTED || state == CONNECTING) << state;
+
     CHECK(checkpoint);
     CHECK_SOME(maxBackoff);
 
@@ -434,26 +494,42 @@ protected:
     return future;
   }
 
-  void _recoveryTimeout(const Connections& _connections)
+  void _recoveryTimeout()
   {
-    // If we're connected, don't do anything.
-    if (state == CONNECTED) {
+    // It's possible that a new connection was established since the timeout
+    // fired and we were unable to cancel this timeout. If this occurs, don't
+    // bother trying to shutdown the executor.
+    if (recoveryTimer.isNone() || !recoveryTimer->timeout().expired()) {
       return;
     }
 
-    // Ignore if this timeout is for a stale connection.
-    if (connections != _connections) {
-      return;
-    }
+    CHECK(state == DISCONNECTED || state == CONNECTING) << state;
 
     CHECK_SOME(recoveryTimeout);
-
     LOG(INFO) << "Recovery timeout of " << recoveryTimeout.get()
               << " exceeded; Shutting down";
 
     shutdown();
   }
 
+  void disconnect()
+  {
+    if (connections.isSome()) {
+      connections->subscribe.disconnect();
+      connections->nonSubscribe.disconnect();
+    }
+
+    if (subscribed.isSome()) {
+      subscribed->reader.close();
+    }
+
+    state = DISCONNECTED;
+
+    connections = None();
+    connectionId = None();
+    subscribed = None();
+  }
+
   // Helper for injecting an ERROR event.
   void error(const string& message)
   {
@@ -466,9 +542,19 @@ protected:
     receive(event, true);
   }
 
-  void _send(const Call& call, const Future<Response>& response)
+  void _send(
+      const UUID& _connectionId,
+      const Call& call,
+      const Future<Response>& response)
   {
+    // It is possible that the agent process failed before a response could
+    // be received.
+    if (connectionId != _connectionId) {
+      return;
+    }
+
     CHECK(!response.isDiscarded());
+    CHECK(state == SUBSCRIBING || state == SUBSCRIBED) << state;
 
     // This can happen if the agent process is restarted or a network blip
     // caused the socket to timeout. Eventually, the executor would
@@ -485,6 +571,8 @@ protected:
       CHECK_EQ(response->type, Response::PIPE);
       CHECK_SOME(response->reader);
 
+      state = SUBSCRIBED;
+
       Pipe::Reader reader = response->reader.get();
 
       auto deserializer =
@@ -493,9 +581,6 @@ protected:
       Owned<Reader<Event>> decoder(
           new Reader<Event>(Decoder<Event>(deserializer), reader));
 
-      // Close previous reader.
-      close();
-
       subscribed = SubscribedResponse {reader, decoder};
 
       read();
@@ -508,6 +593,13 @@ protected:
       return;
     }
 
+    // We reset the state to connected if the subscribe call did not
+    // succceed (e.g., the agent has not yet set up HTTP routes). The executor
+    // can then retry the subscribe call.
+    if (call.type() == Call::SUBSCRIBE) {
+      state = CONNECTED;
+    }
+
     if (response->code == process::http::Status::SERVICE_UNAVAILABLE) {
       // This could happen if the agent is still in the process of recovery.
       LOG(WARNING) << "Received '" << response->status << "' ("
@@ -550,21 +642,25 @@ protected:
       return;
     }
 
+    CHECK_EQ(SUBSCRIBED, state);
+    CHECK_SOME(connectionId);
+
     // This could happen if the agent process died while sending a response.
     if (event.isFailed()) {
       LOG(ERROR) << "Failed to decode the stream of events: "
                  << event.failure();
 
-      disconnected(connections, event.failure());
+      disconnected(connectionId.get(), event.failure());
       return;
     }
 
+    // This could happen if the agent failed over after sending an event.
     if (event->isNone()) {
       const string error =  "End-Of-File received from agent. The agent closed "
                             "the event stream";
       LOG(ERROR) << error;
 
-      disconnected(connections, error);
+      disconnected(connectionId.get(), error);
       return;
     }
 
@@ -577,20 +673,12 @@ protected:
     read();
   }
 
-  void close()
-  {
-    if (subscribed.isSome()) {
-      subscribed->reader.close();
-      subscribed = None();
-    }
-  }
-
   void receive(const Event& event, bool isLocallyInjected)
   {
-    // Check if we're disconnected but received an event.
-    if (!isLocallyInjected && state == DISCONNECTED) {
+    // Check if we're no longer subscribed but received an event.
+    if (!isLocallyInjected && state != SUBSCRIBED) {
       LOG(WARNING) << "Ignoring " << stringify(event.type())
-                   << " event because we're disconnected";
+                   << " event because we're no longer subscribed";
       return;
     }
 
@@ -646,12 +734,34 @@ private:
     process::Owned<Reader<Event>> decoder;
   };
 
-  enum
+  enum State
   {
-    CONNECTED,   // Established subscribe and non-subscribe connection.
-    DISCONNECTED // Either of subscribe/non-subscribe connection is broken.
+    DISCONNECTED, // Either of subscribe/non-subscribe connection is broken.
+    CONNECTING, // Trying to establish subscribe and non-subscribe connections.
+    CONNECTED, // Established subscribe and non-subscribe connections.
+    SUBSCRIBING, // Trying to subscribe with the agent.
+    SUBSCRIBED // Subscribed with the agent.
   } state;
 
+  friend ostream& operator<<(ostream& stream, State state)
+  {
+    switch (state) {
+      case DISCONNECTED: return stream << "DISCONNECTED";
+      case CONNECTING:   return stream << "CONNECTING";
+      case CONNECTED:    return stream << "CONNECTED";
+      case SUBSCRIBING:  return stream << "SUBSCRIBING";
+      case SUBSCRIBED:   return stream << "SUBSCRIBED";
+    }
+
+    UNREACHABLE();
+  }
+
+  // There can be multiple simulataneous ongoing (re-)connection attempts with
+  // the agent (e.g., the agent process restarted while an attempt was in
+  // progress). This helps us in uniquely identifying the current connection
+  // instance and ignoring the stale instance.
+  Option<UUID> connectionId; // UUID to identify the connection instance.
+
   ContentType contentType;
   Callbacks callbacks;
   Mutex mutex; // Used to serialize the callback invocations.
@@ -663,6 +773,7 @@ private:
   bool checkpoint;
   Option<Duration> recoveryTimeout;
   Option<Duration> maxBackoff;
+  Option<Timer> recoveryTimer;
   Duration shutdownGracePeriod;
 };