You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/12/12 23:20:20 UTC
[2/2] mesos git commit: Added a bi-directional heartbeat for
IOSwitchboard connections.
Added a bi-directional heartbeat for IOSwitchboard connections.
Review: https://reviews.apache.org/r/54560/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/441e0767
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/441e0767
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/441e0767
Branch: refs/heads/master
Commit: 441e0767f60a14496de67cf17078e912faf2daeb
Parents: c1f9af5
Author: Kevin Klues <kl...@gmail.com>
Authored: Mon Dec 12 15:20:04 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Dec 12 15:20:04 2016 -0800
----------------------------------------------------------------------
include/mesos/agent/agent.proto | 6 ++
include/mesos/v1/agent/agent.proto | 6 ++
.../containerizer/mesos/io/switchboard.cpp | 106 +++++++++++++++----
.../containerizer/mesos/io/switchboard.hpp | 12 ++-
.../containerizer/mesos/io/switchboard_main.cpp | 3 +-
5 files changed, 108 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/include/mesos/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 9d5c3e7..775c14b 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -366,10 +366,16 @@ message ProcessIO {
enum Type {
UNKNOWN = 0;
TTY_INFO = 1;
+ HEARTBEAT = 2;
+ }
+
+ message Heartbeat {
+ optional DurationInfo interval = 1;
}
optional Type type = 1;
optional TTYInfo tty_info = 2;
+ optional Heartbeat heartbeat = 3;
}
optional Type type = 1;
http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/include/mesos/v1/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index d0fdf29..a98acb7 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -365,10 +365,16 @@ message ProcessIO {
enum Type {
UNKNOWN = 0;
TTY_INFO = 1;
+ HEARTBEAT = 2;
+ }
+
+ message Heartbeat {
+ optional DurationInfo interval = 1;
}
optional Type type = 1;
optional TTYInfo tty_info = 2;
+ optional Heartbeat heartbeat = 3;
}
optional Type type = 1;
http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/src/slave/containerizer/mesos/io/switchboard.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 210556f..f900924 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -511,6 +511,7 @@ Future<Option<ContainerLaunchInfo>> IOSwitchboard::_prepare(
switchboardFlags.stderr_from_fd = stderrFromFd;
switchboardFlags.stdout_to_fd = STDOUT_FILENO;
switchboardFlags.stderr_to_fd = STDERR_FILENO;
+ switchboardFlags.heartbeat_interval = flags.http_heartbeat_interval;
if (containerConfig.container_class() == ContainerClass::DEBUG) {
switchboardFlags.wait_for_connection = true;
@@ -877,7 +878,8 @@ public:
int _stderrFromFd,
int _stderrToFd,
const unix::Socket& _socket,
- bool waitForConnection);
+ bool waitForConnection,
+ Option<Duration> heartbeatInterval);
virtual void finalize();
@@ -915,6 +917,9 @@ private:
::recordio::Encoder<agent::ProcessIO> encoder;
};
+ // Sit in a heartbeat loop forever.
+ void heartbeatLoop();
+
// Sit in an accept loop forever.
void acceptLoop();
@@ -953,6 +958,7 @@ private:
int stderrToFd;
unix::Socket socket;
bool waitForConnection;
+ Option<Duration> heartbeatInterval;
bool inputConnected;
Promise<Nothing> promise;
Promise<Nothing> startRedirect;
@@ -971,7 +977,8 @@ Try<Owned<IOSwitchboardServer>> IOSwitchboardServer::create(
int stderrFromFd,
int stderrToFd,
const string& socketPath,
- bool waitForConnection)
+ bool waitForConnection,
+ Option<Duration> heartbeatInterval)
{
Try<unix::Socket> socket = unix::Socket::create(SocketImpl::Kind::POLL);
if (socket.isError()) {
@@ -1004,7 +1011,8 @@ Try<Owned<IOSwitchboardServer>> IOSwitchboardServer::create(
stderrFromFd,
stderrToFd,
socket.get(),
- waitForConnection);
+ waitForConnection,
+ heartbeatInterval);
}
@@ -1016,7 +1024,8 @@ IOSwitchboardServer::IOSwitchboardServer(
int stderrFromFd,
int stderrToFd,
const unix::Socket& socket,
- bool waitForConnection)
+ bool waitForConnection,
+ Option<Duration> heartbeatInterval)
: process(new IOSwitchboardServerProcess(
tty,
stdinToFd,
@@ -1025,7 +1034,8 @@ IOSwitchboardServer::IOSwitchboardServer(
stderrFromFd,
stderrToFd,
socket,
- waitForConnection))
+ waitForConnection,
+ heartbeatInterval))
{
spawn(process.get());
}
@@ -1058,7 +1068,8 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
int _stderrFromFd,
int _stderrToFd,
const unix::Socket& _socket,
- bool _waitForConnection)
+ bool _waitForConnection,
+ Option<Duration> _heartbeatInterval)
: tty(_tty),
stdinToFd(_stdinToFd),
stdoutFromFd(_stdoutFromFd),
@@ -1067,6 +1078,7 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
stderrToFd(_stderrToFd),
socket(_socket),
waitForConnection(_waitForConnection),
+ heartbeatInterval(_heartbeatInterval),
inputConnected(false) {}
@@ -1159,6 +1171,12 @@ Future<Nothing> IOSwitchboardServerProcess::run()
return Nothing();
}));
+ // If we have a heartbeat interval set, send a heartbeat to all of
+ // our outstanding output connections at the proper interval.
+ if (heartbeatInterval.isSome()) {
+ heartbeatLoop();
+ }
+
acceptLoop();
return promise.future();
@@ -1191,6 +1209,28 @@ void IOSwitchboardServerProcess::finalize()
}
+void IOSwitchboardServerProcess::heartbeatLoop()
+{
+ CHECK(heartbeatInterval.isSome());
+
+ agent::ProcessIO message;
+ message.set_type(agent::ProcessIO::CONTROL);
+ message.mutable_control()->set_type(
+ agent::ProcessIO::Control::HEARTBEAT);
+ message.mutable_control()->mutable_heartbeat()
+ ->mutable_interval()->set_nanoseconds(heartbeatInterval.get().ns());
+
+ foreach (HttpConnection& connection, outputConnections) {
+ connection.send(message);
+ }
+
+ // Dispatch back to ourselves after the `heartbeatInterval`.
+ delay(heartbeatInterval.get(),
+ self(),
+ &IOSwitchboardServerProcess::heartbeatLoop);
+}
+
+
void IOSwitchboardServerProcess::acceptLoop()
{
socket.accept()
@@ -1366,10 +1406,20 @@ Option<Error> IOSwitchboardServerProcess::validate(
if (!ttyInfo.has_window_size()) {
return Error("Expecting 'tty_info.window_size' to be present");
}
+
+ return None();
+ }
+ case agent::ProcessIO::Control::HEARTBEAT: {
+ if (!message.control().has_heartbeat()) {
+ return Error(
+ "Expecting 'process_io.control.heartbeat' to be present");
+ }
+
+ return None();
}
}
- return None();
+ UNREACHABLE();
}
case agent::ProcessIO::DATA: {
if (!message.has_data()) {
@@ -1449,22 +1499,34 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
switch (message.type()) {
case agent::ProcessIO::CONTROL: {
- // TODO(klueska): Return a failure if the container we are
- // attaching to does not have a tty associated with it.
-
- // Update the window size.
- Try<Nothing> window = os::setWindowSize(
- stdinToFd,
- message.control().tty_info().window_size().rows(),
- message.control().tty_info().window_size().columns());
-
- if (window.isError()) {
- *response = http::BadRequest(
- "Unable to set the window size: " + window.error());
- return false;
- }
+ switch (message.type()) {
+ case agent::ProcessIO::Control::TTY_INFO: {
+ // TODO(klueska): Return a failure if the container we are
+ // attaching to does not have a tty associated with it.
+
+ // Update the window size.
+ Try<Nothing> window = os::setWindowSize(
+ stdinToFd,
+ message.control().tty_info().window_size().rows(),
+ message.control().tty_info().window_size().columns());
+
+ if (window.isError()) {
+ *response = http::BadRequest(
+ "Unable to set the window size: " + window.error());
+ return false;
+ }
- return true;
+ return true;
+ }
+ case agent::ProcessIO::Control::HEARTBEAT: {
+ // For now, we ignore any interval information
+ // sent along with the heartbeat.
+ return true;
+ }
+ default: {
+ UNREACHABLE();
+ }
+ }
}
case agent::ProcessIO::DATA: {
// Receiving a `DATA` message with length 0 indicates
http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/src/slave/containerizer/mesos/io/switchboard.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.hpp b/src/slave/containerizer/mesos/io/switchboard.hpp
index fb720f0..9f4ce99 100644
--- a/src/slave/containerizer/mesos/io/switchboard.hpp
+++ b/src/slave/containerizer/mesos/io/switchboard.hpp
@@ -138,7 +138,8 @@ public:
int stderrFromFd,
int stderrToFd,
const std::string& socketPath,
- bool waitForConnection = false);
+ bool waitForConnection = false,
+ Option<Duration> heartbeatInterval = None());
~IOSwitchboardServer();
@@ -177,7 +178,8 @@ private:
int stderrFromFd,
int stderrToFd,
const process::network::unix::Socket& socket,
- bool waitForConnection);
+ bool waitForConnection,
+ Option<Duration> heartbeatInterval);
process::Owned<IOSwitchboardServerProcess> process;
};
@@ -248,6 +250,11 @@ struct IOSwitchboardServerFlags : public virtual flags::FlagsBase
"The path of the unix domain socket this\n"
"io switchboard should attach itself to.",
"");
+
+ add(&IOSwitchboardServerFlags::heartbeat_interval,
+ "heartbeat_interval",
+ "A heartbeat interval (e.g. '5secs', '10mins') for messages to\n"
+ "be sent to any open 'ATTACH_CONTAINER_OUTPUT' connections.");
}
bool tty;
@@ -258,6 +265,7 @@ struct IOSwitchboardServerFlags : public virtual flags::FlagsBase
int stderr_to_fd;
std::string socket_path;
bool wait_for_connection;
+ Option<Duration> heartbeat_interval;
};
#endif // __WINDOWS__
http://git-wip-us.apache.org/repos/asf/mesos/blob/441e0767/src/slave/containerizer/mesos/io/switchboard_main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard_main.cpp b/src/slave/containerizer/mesos/io/switchboard_main.cpp
index 8c4b30a..7fa56d7 100644
--- a/src/slave/containerizer/mesos/io/switchboard_main.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard_main.cpp
@@ -96,7 +96,8 @@ int main(int argc, char** argv)
flags.stderr_from_fd,
flags.stderr_to_fd,
flags.socket_path,
- flags.wait_for_connection);
+ flags.wait_for_connection,
+ flags.heartbeat_interval);
if (server.isError()) {
EXIT(EXIT_FAILURE) << "Failed to create the io switchboard server:"