You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/12/12 23:20:20 UTC

[2/2] mesos git commit: Added a bi-directional heartbeat for IOSwitchboard connections.

Added a bi-directional heartbeat for IOSwitchboard connections.

Review: https://reviews.apache.org/r/54560/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/441e0767
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/441e0767
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/441e0767

Branch: refs/heads/master
Commit: 441e0767f60a14496de67cf17078e912faf2daeb
Parents: c1f9af5
Author: Kevin Klues <kl...@gmail.com>
Authored: Mon Dec 12 15:20:04 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Dec 12 15:20:04 2016 -0800

----------------------------------------------------------------------
 include/mesos/agent/agent.proto                 |   6 ++
 include/mesos/v1/agent/agent.proto              |   6 ++
 .../containerizer/mesos/io/switchboard.cpp      | 106 +++++++++++++++----
 .../containerizer/mesos/io/switchboard.hpp      |  12 ++-
 .../containerizer/mesos/io/switchboard_main.cpp |   3 +-
 5 files changed, 108 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/include/mesos/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 9d5c3e7..775c14b 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -366,10 +366,16 @@ message ProcessIO {
     enum Type {
       UNKNOWN = 0;
       TTY_INFO = 1;
+      HEARTBEAT = 2;
+    }
+
+    message Heartbeat {
+      optional DurationInfo interval = 1;
     }
 
     optional Type type = 1;
     optional TTYInfo tty_info = 2;
+    optional Heartbeat heartbeat = 3;
   }
 
   optional Type type = 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/include/mesos/v1/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index d0fdf29..a98acb7 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -365,10 +365,16 @@ message ProcessIO {
     enum Type {
       UNKNOWN = 0;
       TTY_INFO = 1;
+      HEARTBEAT = 2;
+    }
+
+    message Heartbeat {
+      optional DurationInfo interval = 1;
     }
 
     optional Type type = 1;
     optional TTYInfo tty_info = 2;
+    optional Heartbeat heartbeat = 3;
   }
 
   optional Type type = 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/src/slave/containerizer/mesos/io/switchboard.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 210556f..f900924 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -511,6 +511,7 @@ Future<Option<ContainerLaunchInfo>> IOSwitchboard::_prepare(
   switchboardFlags.stderr_from_fd = stderrFromFd;
   switchboardFlags.stdout_to_fd = STDOUT_FILENO;
   switchboardFlags.stderr_to_fd = STDERR_FILENO;
+  switchboardFlags.heartbeat_interval = flags.http_heartbeat_interval;
 
   if (containerConfig.container_class() == ContainerClass::DEBUG) {
     switchboardFlags.wait_for_connection = true;
@@ -877,7 +878,8 @@ public:
       int _stderrFromFd,
       int _stderrToFd,
       const unix::Socket& _socket,
-      bool waitForConnection);
+      bool waitForConnection,
+      Option<Duration> heartbeatInterval);
 
   virtual void finalize();
 
@@ -915,6 +917,9 @@ private:
     ::recordio::Encoder<agent::ProcessIO> encoder;
   };
 
+  // Sit in a heartbeat loop forever.
+  void heartbeatLoop();
+
   // Sit in an accept loop forever.
   void acceptLoop();
 
@@ -953,6 +958,7 @@ private:
   int stderrToFd;
   unix::Socket socket;
   bool waitForConnection;
+  Option<Duration> heartbeatInterval;
   bool inputConnected;
   Promise<Nothing> promise;
   Promise<Nothing> startRedirect;
@@ -971,7 +977,8 @@ Try<Owned<IOSwitchboardServer>> IOSwitchboardServer::create(
     int stderrFromFd,
     int stderrToFd,
     const string& socketPath,
-    bool waitForConnection)
+    bool waitForConnection,
+    Option<Duration> heartbeatInterval)
 {
   Try<unix::Socket> socket = unix::Socket::create(SocketImpl::Kind::POLL);
   if (socket.isError()) {
@@ -1004,7 +1011,8 @@ Try<Owned<IOSwitchboardServer>> IOSwitchboardServer::create(
       stderrFromFd,
       stderrToFd,
       socket.get(),
-      waitForConnection);
+      waitForConnection,
+      heartbeatInterval);
 }
 
 
@@ -1016,7 +1024,8 @@ IOSwitchboardServer::IOSwitchboardServer(
     int stderrFromFd,
     int stderrToFd,
     const unix::Socket& socket,
-    bool waitForConnection)
+    bool waitForConnection,
+    Option<Duration> heartbeatInterval)
   : process(new IOSwitchboardServerProcess(
         tty,
         stdinToFd,
@@ -1025,7 +1034,8 @@ IOSwitchboardServer::IOSwitchboardServer(
         stderrFromFd,
         stderrToFd,
         socket,
-        waitForConnection))
+        waitForConnection,
+        heartbeatInterval))
 {
   spawn(process.get());
 }
@@ -1058,7 +1068,8 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
     int _stderrFromFd,
     int _stderrToFd,
     const unix::Socket& _socket,
-    bool _waitForConnection)
+    bool _waitForConnection,
+    Option<Duration> _heartbeatInterval)
   : tty(_tty),
     stdinToFd(_stdinToFd),
     stdoutFromFd(_stdoutFromFd),
@@ -1067,6 +1078,7 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
     stderrToFd(_stderrToFd),
     socket(_socket),
     waitForConnection(_waitForConnection),
+    heartbeatInterval(_heartbeatInterval),
     inputConnected(false) {}
 
 
@@ -1159,6 +1171,12 @@ Future<Nothing> IOSwitchboardServerProcess::run()
       return Nothing();
     }));
 
+  // If we have a heartbeat interval set, send a heartbeat to all of
+  // our outstanding output connections at the proper interval.
+  if (heartbeatInterval.isSome()) {
+    heartbeatLoop();
+  }
+
   acceptLoop();
 
   return promise.future();
@@ -1191,6 +1209,28 @@ void IOSwitchboardServerProcess::finalize()
 }
 
 
+void IOSwitchboardServerProcess::heartbeatLoop()
+{
+  CHECK(heartbeatInterval.isSome());
+
+  agent::ProcessIO message;
+  message.set_type(agent::ProcessIO::CONTROL);
+  message.mutable_control()->set_type(
+      agent::ProcessIO::Control::HEARTBEAT);
+  message.mutable_control()->mutable_heartbeat()
+      ->mutable_interval()->set_nanoseconds(heartbeatInterval.get().ns());
+
+  foreach (HttpConnection& connection, outputConnections) {
+    connection.send(message);
+  }
+
+  // Dispatch back to ourselves after the `heartbeatInterval`.
+  delay(heartbeatInterval.get(),
+        self(),
+        &IOSwitchboardServerProcess::heartbeatLoop);
+}
+
+
 void IOSwitchboardServerProcess::acceptLoop()
 {
   socket.accept()
@@ -1366,10 +1406,20 @@ Option<Error> IOSwitchboardServerProcess::validate(
               if (!ttyInfo.has_window_size()) {
                 return Error("Expecting 'tty_info.window_size' to be present");
               }
+
+              return None();
+            }
+            case agent::ProcessIO::Control::HEARTBEAT: {
+              if (!message.control().has_heartbeat()) {
+                return Error(
+                    "Expecting 'process_io.control.heartbeat' to be present");
+              }
+
+              return None();
             }
           }
 
-          return None();
+          UNREACHABLE();
         }
         case agent::ProcessIO::DATA: {
           if (!message.has_data()) {
@@ -1449,22 +1499,34 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
 
         switch (message.type()) {
           case agent::ProcessIO::CONTROL: {
-            // TODO(klueska): Return a failure if the container we are
-            // attaching to does not have a tty associated with it.
-
-            // Update the window size.
-            Try<Nothing> window = os::setWindowSize(
-                stdinToFd,
-                message.control().tty_info().window_size().rows(),
-                message.control().tty_info().window_size().columns());
-
-            if (window.isError()) {
-              *response = http::BadRequest(
-                  "Unable to set the window size: "  + window.error());
-              return false;
-            }
+            switch (message.type()) {
+              case agent::ProcessIO::Control::TTY_INFO: {
+                // TODO(klueska): Return a failure if the container we are
+                // attaching to does not have a tty associated with it.
+
+                // Update the window size.
+                Try<Nothing> window = os::setWindowSize(
+                    stdinToFd,
+                    message.control().tty_info().window_size().rows(),
+                    message.control().tty_info().window_size().columns());
+
+                if (window.isError()) {
+                  *response = http::BadRequest(
+                      "Unable to set the window size: "  + window.error());
+                  return false;
+                }
 
-            return true;
+                return true;
+              }
+              case agent::ProcessIO::Control::HEARTBEAT: {
+                // For now, we ignore any interval information
+                // sent along with the heartbeat.
+                return true;
+              }
+              default: {
+                UNREACHABLE();
+              }
+            }
           }
           case agent::ProcessIO::DATA: {
             // Receiving a `DATA` message with length 0 indicates

http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/src/slave/containerizer/mesos/io/switchboard.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.hpp b/src/slave/containerizer/mesos/io/switchboard.hpp
index fb720f0..9f4ce99 100644
--- a/src/slave/containerizer/mesos/io/switchboard.hpp
+++ b/src/slave/containerizer/mesos/io/switchboard.hpp
@@ -138,7 +138,8 @@ public:
       int stderrFromFd,
       int stderrToFd,
       const std::string& socketPath,
-      bool waitForConnection = false);
+      bool waitForConnection = false,
+      Option<Duration> heartbeatInterval = None());
 
   ~IOSwitchboardServer();
 
@@ -177,7 +178,8 @@ private:
       int stderrFromFd,
       int stderrToFd,
       const process::network::unix::Socket& socket,
-      bool waitForConnection);
+      bool waitForConnection,
+      Option<Duration> heartbeatInterval);
 
   process::Owned<IOSwitchboardServerProcess> process;
 };
@@ -248,6 +250,11 @@ struct IOSwitchboardServerFlags : public virtual flags::FlagsBase
         "The path of the unix domain socket this\n"
         "io switchboard should attach itself to.",
         "");
+
+    add(&IOSwitchboardServerFlags::heartbeat_interval,
+        "heartbeat_interval",
+        "A heartbeat interval (e.g. '5secs', '10mins') for messages to\n"
+        "be sent to any open 'ATTACH_CONTAINER_OUTPUT' connections.");
   }
 
   bool tty;
@@ -258,6 +265,7 @@ struct IOSwitchboardServerFlags : public virtual flags::FlagsBase
   int stderr_to_fd;
   std::string socket_path;
   bool wait_for_connection;
+  Option<Duration> heartbeat_interval;
 };
 #endif // __WINDOWS__
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/src/slave/containerizer/mesos/io/switchboard_main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard_main.cpp b/src/slave/containerizer/mesos/io/switchboard_main.cpp
index 8c4b30a..7fa56d7 100644
--- a/src/slave/containerizer/mesos/io/switchboard_main.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard_main.cpp
@@ -96,7 +96,8 @@ int main(int argc, char** argv)
       flags.stderr_from_fd,
       flags.stderr_to_fd,
       flags.socket_path,
-      flags.wait_for_connection);
+      flags.wait_for_connection,
+      flags.heartbeat_interval);
 
   if (server.isError()) {
     EXIT(EXIT_FAILURE) << "Failed to create the io switchboard server:"