You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/12/05 18:12:10 UTC

[3/4] mesos git commit: Added support for 'ATTACH_CONTAINER_INPUT' call to the io switchboard.

Added support for 'ATTACH_CONTAINER_INPUT' call to the io switchboard.

Review: https://reviews.apache.org/r/54297/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/be45c94f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/be45c94f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/be45c94f

Branch: refs/heads/master
Commit: be45c94fc7009851f297ee0403a4bf09f00b6e25
Parents: 02d816e
Author: Kevin Klues <kl...@gmail.com>
Authored: Mon Dec 5 09:45:15 2016 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Mon Dec 5 09:45:15 2016 -0800

----------------------------------------------------------------------
 .../containerizer/mesos/io/switchboard.cpp      | 235 +++++++++++++++++--
 .../containerizer/io_switchboard_tests.cpp      | 136 ++++++++++-
 2 files changed, 355 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/be45c94f/src/slave/containerizer/mesos/io/switchboard.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index d5211b9..3b6d326 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -29,8 +29,8 @@
 #include <process/future.hpp>
 #include <process/http.hpp>
 #include <process/io.hpp>
+#include <process/loop.hpp>
 #include <process/owned.hpp>
-
 #include <process/process.hpp>
 #include <process/reap.hpp>
 #include <process/subprocess.hpp>
@@ -607,6 +607,13 @@ private:
   // with the same format we receive them in.
   Future<http::Response> handler(const http::Request& request);
 
+  // Validate `ATTACH_CONTAINER_INPUT` calls.
+  //
+  // TODO(klueska): Move this to `src/slave/validation.hpp` and make
+  // the agent validate all the calls before forwarding them to the
+  // switchboard.
+  Option<Error> validate(const agent::Call::AttachContainerInput& call);
+
   // Handle `ATTACH_CONTAINER_INPUT` calls.
   Future<http::Response> attachContainerInput(
       const Owned<recordio::Reader<agent::Call>>& reader);
@@ -628,11 +635,12 @@ private:
   int stderrToFd;
   unix::Socket socket;
   bool waitForConnection;
+  bool inputConnected;
   Promise<Nothing> promise;
   Promise<Nothing> startRedirect;
   // The following must be a `std::list`
   // for proper erase semantics later on.
-  list<HttpConnection> connections;
+  list<HttpConnection> outputConnections;
   Option<Failure> failure;
 };
 
@@ -734,15 +742,12 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
     stderrFromFd(_stderrFromFd),
     stderrToFd(_stderrToFd),
     socket(_socket),
-    waitForConnection(_waitForConnection) {}
+    waitForConnection(_waitForConnection),
+    inputConnected(false) {}
 
 
 Future<Nothing> IOSwitchboardServerProcess::run()
 {
-  // TODO(jieyu): This silence the compiler warning of private field
-  // being not used. Remove this once it is used.
-  stdinToFd = -1;
-
   if (!waitForConnection) {
     startRedirect.set(Nothing());
   }
@@ -838,7 +843,7 @@ Future<Nothing> IOSwitchboardServerProcess::run()
 
 void IOSwitchboardServerProcess::finalize()
 {
-  foreach (HttpConnection& connection, connections) {
+  foreach (HttpConnection& connection, outputConnections) {
     connection.close();
   }
 
@@ -934,14 +939,21 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
           [=](const Result<agent::Call>& call) -> Future<http::Response> {
             if (call.isNone()) {
               return http::BadRequest(
-                  "Received EOF while reading request body");
+                  "IOSwitchboard received EOF while reading request body");
             }
 
             if (call.isError()) {
               return Failure(call.error());
             }
 
+            // Should have already been validated by the agent.
+            CHECK(call->has_type());
             CHECK_EQ(agent::Call::ATTACH_CONTAINER_INPUT, call->type());
+            CHECK(call->has_attach_container_input());
+            CHECK_EQ(mesos::agent::Call::AttachContainerInput::CONTAINER_ID,
+                     call->attach_container_input().type());
+            CHECK(call->attach_container_input().has_container_id());
+            CHECK(call->attach_container_input().container_id().has_value());
 
             return attachContainerInput(reader);
           }));
@@ -957,6 +969,8 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
               return http::BadRequest(call.error());
             }
 
+            // Should have already been validated by the agent.
+            CHECK(call->has_type());
             CHECK_EQ(agent::Call::ATTACH_CONTAINER_OUTPUT, call->type());
 
             return attachContainerOutput(acceptType);
@@ -965,10 +979,203 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
 }
 
 
+Option<Error> IOSwitchboardServerProcess::validate(
+    const agent::Call::AttachContainerInput& call)
+{
+  switch (call.type()) {
+    case agent::Call::AttachContainerInput::UNKNOWN:
+    case agent::Call::AttachContainerInput::CONTAINER_ID: {
+        return Error(
+            "Expecting 'attach_container_input.type' to be 'PROCESS_IO'"
+             " instead of: '" + stringify(call.type()) + "'");
+    }
+    case agent::Call::AttachContainerInput::PROCESS_IO: {
+      if (!call.has_process_io()) {
+        return Error(
+            "Expecting 'attach_container_input.process_io' to be present");
+      }
+
+      const agent::ProcessIO& message = call.process_io();
+
+      if (!message.has_type()) {
+        return Error("Expecting 'process_io.type' to be present");
+      }
+
+      switch (message.type()) {
+        case agent::ProcessIO::UNKNOWN: {
+          return Error("'process_io.type' is unknown");
+        }
+        case agent::ProcessIO::CONTROL: {
+          if (!message.has_control()) {
+            return Error("Expecting 'process_io.control' to be present");
+          }
+
+          if (!message.control().has_type()) {
+            return Error("Expecting 'process_io.control.type' to be present");
+          }
+
+          switch (message.control().type()) {
+            case agent::ProcessIO::Control::UNKNOWN: {
+              return Error("'process_io.control.type' is unknown");
+            }
+            case agent::ProcessIO::Control::TTY_INFO: {
+              if (!message.control().has_tty_info()) {
+                return Error(
+                    "Expecting 'process_io.control.tty_info' to be present");
+              }
+
+              const TTYInfo& ttyInfo = message.control().tty_info();
+
+              if (!ttyInfo.has_window_size()) {
+                return Error("Expecting 'tty_info.window_size' to be present");
+              }
+            }
+          }
+
+          return None();
+        }
+        case agent::ProcessIO::DATA: {
+          if (!message.has_data()) {
+            return Error("Expecting 'process_io.data' to be present");
+          }
+
+          if (!message.data().has_type()) {
+            return Error("Expecting 'process_io.data.type' to be present");
+          }
+
+          if (message.data().type() != agent::ProcessIO::Data::STDIN) {
+            return Error("Expecting 'process_io.data.type' to be 'STDIN'");
+          }
+
+          if (!message.data().has_data()) {
+            return Error("Expecting 'process_io.data.data' to be present");
+          }
+
+          return None();
+        }
+      }
+    }
+  }
+
+  UNREACHABLE();
+}
+
+
 Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
     const Owned<recordio::Reader<agent::Call>>& reader)
 {
-  return http::NotImplemented("ATTACH_CONTAINER_INPUT");
+  // Only allow a single input connection at a time.
+  if (inputConnected) {
+    return http::Conflict("Multiple input connections are not allowed");
+  }
+
+  // We set `inputConnected` to true here and then reset it to false
+  // at the bottom of this function once our asynchronous loop has
+  // terminated. This way another connection can be established once
+  // the current one is complete.
+  inputConnected = true;
+
+  // Loop through each record and process it. Return a proper
+  // response once the last record has been fully processed.
+  Owned<http::Response> response(new http::Response());
+
+  return loop(
+      self(),
+      [=]() {
+        return reader->read();
+      },
+      [=](const Result<agent::Call>& record) -> Future<bool> {
+        if (record.isNone()) {
+          *response = http::OK();
+          return false;
+        }
+
+        if (record.isError()) {
+          *response = http::BadRequest(record.error());
+          return false;
+        }
+
+        // Should have already been validated by the agent.
+        CHECK(record->has_type());
+        CHECK_EQ(mesos::agent::Call::ATTACH_CONTAINER_INPUT, record->type());
+        CHECK(record->has_attach_container_input());
+
+        // Validate the rest of the `AttachContainerInput` message.
+        Option<Error> error = validate(record->attach_container_input());
+        if (error.isSome()) {
+          *response = http::BadRequest(error->message);
+          return false;
+        }
+
+        const agent::ProcessIO& message =
+          record->attach_container_input().process_io();
+
+        switch (message.type()) {
+          case agent::ProcessIO::CONTROL: {
+            // TODO(klueska): Return a failure if the container we are
+            // attaching to does not have a tty associated with it.
+
+            // Update the window size.
+            Try<Nothing> window = os::setWindowSize(
+                stdinToFd,
+                message.control().tty_info().window_size().rows(),
+                message.control().tty_info().window_size().columns());
+
+            if (window.isError()) {
+              *response = http::BadRequest(
+                  "Unable to set the window size: "  + window.error());
+              return false;
+            }
+
+            return true;
+          }
+          case agent::ProcessIO::DATA: {
+            // Write the STDIN data to `stdinToFd`. If there is a
+            // failure, we set the `failure` member variable and exit
+            // the loop. In the resulting `.then()` callback, we then
+            // terminate the process. We don't terminate the process
+            // here because we want to propagate an `InternalServerError`
+            // back to the client.
+            Owned<Promise<bool>> condition(new Promise<bool>());
+
+            process::io::write(stdinToFd, message.data().data())
+              .onAny(defer(self(), [this, condition, response](
+                  const Future<Nothing>& future) {
+                if (future.isReady()) {
+                  condition->set(true);
+                  return;
+                }
+
+                failure = Failure(
+                    "Failed writing to stdin:"
+                    " " + (future.isFailed() ? future.failure() : "discarded"));
+
+                *response = http::InternalServerError(failure->message);
+
+                condition->set(false);
+              }));
+
+            return condition->future();
+          }
+          default: {
+            UNREACHABLE();
+          }
+        }
+      })
+    .then(defer(self(), [this, response](
+        const Future<Nothing> future) -> http::Response {
+      // Reset `inputConnected` to allow future input connections.
+      inputConnected = false;
+
+      // We only set `failure` if writing to `stdin` failed, in which
+      // case we want to terminate ourselves (after flushing any
+      // outstanding messages from our message queue).
+      if (failure.isSome()) {
+        terminate(self(), false);
+      }
+
+      return *response;
+    }));
 }
 
 
@@ -987,7 +1194,7 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput(
   // connection. If we ever detect a connection has been closed,
   // we remove it from this list.
   HttpConnection connection(pipe.writer(), acceptType);
-  auto iterator = connections.insert(connections.end(), connection);
+  auto iterator = outputConnections.insert(outputConnections.end(), connection);
 
   // We use the `startRedirect` promise to indicate when we should
   // begin reading data from our `stdoutFromFd` and `stderrFromFd`
@@ -1002,7 +1209,7 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput(
     .then(defer(self(), [this, iterator]() {
       // Erasing from a `std::list` only invalidates the iterator of
       // the object being erased. All other iterators remain valid.
-      connections.erase(iterator);
+      outputConnections.erase(iterator);
       return Nothing();
     }));
 
@@ -1015,7 +1222,7 @@ void IOSwitchboardServerProcess::outputHook(
     const agent::ProcessIO::Data::Type& type)
 {
   // Break early if there are no connections to send the data to.
-  if (connections.size() == 0) {
+  if (outputConnections.size() == 0) {
     return;
   }
 
@@ -1032,7 +1239,7 @@ void IOSwitchboardServerProcess::outputHook(
   // the `HttpConnection::closed()` call above. We might do a few
   // unnecessary writes if we have a bunch of messages queued up,
   // but that shouldn't be a problem.
-  foreach (HttpConnection& connection, connections) {
+  foreach (HttpConnection& connection, outputConnections) {
     connection.send(message);
   }
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/be45c94f/src/tests/containerizer/io_switchboard_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index 131ca94..c8fe876 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -47,6 +47,7 @@ namespace unix = process::network::unix;
 #endif // __WINDOWS__
 
 using mesos::agent::Call;
+using mesos::agent::ProcessIO;
 
 using mesos::internal::slave::Fetcher;
 using mesos::internal::slave::IOSwitchboardServer;
@@ -69,7 +70,7 @@ namespace tests {
 class IOSwitchboardServerTest : public TemporaryDirectoryTest {};
 
 
-TEST_F(IOSwitchboardServerTest, ServerRedirectLog)
+TEST_F(IOSwitchboardServerTest, RedirectLog)
 {
   int stdoutPipe[2];
   int stderrPipe[2];
@@ -158,7 +159,7 @@ TEST_F(IOSwitchboardServerTest, ServerRedirectLog)
 }
 
 
-TEST_F(IOSwitchboardServerTest, ServerAttachOutput)
+TEST_F(IOSwitchboardServerTest, AttachOutput)
 {
   Try<int> nullFd = os::open("/dev/null", O_RDWR);
   ASSERT_SOME(nullFd);
@@ -308,6 +309,136 @@ TEST_F(IOSwitchboardServerTest, ServerAttachOutput)
 }
 
 
+TEST_F(IOSwitchboardServerTest, AttachInput)
+{
+  // We use a pipe in this test to prevent the switchboard from
+  // reading EOF on its `stdoutFromFd` until we are ready for the
+  // switchboard to terminate.
+  int stdoutPipe[2];
+
+  Try<Nothing> pipe = os::pipe(stdoutPipe);
+  ASSERT_SOME(pipe);
+
+  Try<int> nullFd = os::open("/dev/null", O_RDWR);
+  ASSERT_SOME(nullFd);
+
+  string stdinPath = path::join(sandbox.get(), "stdin");
+  Try<int> stdinFd = os::open(
+      stdinPath,
+      O_RDWR | O_CREAT,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+  ASSERT_SOME(stdinFd);
+
+  string socketPath = path::join(
+      sandbox.get(),
+      "mesos-io-switchboard-" + UUID::random().toString());
+
+  Try<Owned<IOSwitchboardServer>> server = IOSwitchboardServer::create(
+      false,
+      stdinFd.get(),
+      stdoutPipe[0],
+      nullFd.get(),
+      nullFd.get(),
+      nullFd.get(),
+      socketPath,
+      false);
+
+  ASSERT_SOME(server);
+
+  Future<Nothing> runServer = server.get()->run();
+
+  string data =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+    "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+    "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+    "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+    "culpa qui officia deserunt mollit anim id est laborum.";
+
+  while (Bytes(data.size()) < Megabytes(1)) {
+    data.append(data);
+  }
+
+  http::Pipe requestPipe;
+  http::Pipe::Reader reader = requestPipe.reader();
+  http::Pipe::Writer writer = requestPipe.writer();
+
+  http::Request request;
+  request.method = "POST";
+  request.type = http::Request::PIPE;
+  request.reader = reader;
+  request.url.domain = "";
+  request.url.path = "/";
+  request.keepAlive = true;
+  request.headers["Accept"] = APPLICATION_JSON;
+  request.headers["Content-Type"] = APPLICATION_STREAMING_JSON;
+
+  Try<unix::Address> address = unix::Address::create(socketPath);
+  ASSERT_SOME(address);
+
+  Future<http::Connection> _connection = http::connect(address.get());
+  AWAIT_READY(_connection);
+  http::Connection connection = _connection.get();
+
+  Future<http::Response> response = connection.send(request);
+
+  ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind(
+        serialize, ContentType::STREAMING_JSON, lambda::_1));
+
+  Call call;
+  call.set_type(Call::ATTACH_CONTAINER_INPUT);
+
+  Call::AttachContainerInput* attach = call.mutable_attach_container_input();
+  attach->set_type(Call::AttachContainerInput::CONTAINER_ID);
+  attach->mutable_container_id()->set_value(UUID::random().toString());
+
+  writer.write(encoder.encode(call));
+
+  size_t offset = 0;
+  size_t chunkSize = 4096;
+  while (offset < data.length()) {
+    string dataChunk = data.substr(offset, chunkSize);
+    offset += chunkSize;
+
+    Call call;
+    call.set_type(Call::ATTACH_CONTAINER_INPUT);
+
+    Call::AttachContainerInput* attach = call.mutable_attach_container_input();
+    attach->set_type(Call::AttachContainerInput::PROCESS_IO);
+
+    ProcessIO* message = attach->mutable_process_io();
+    message->set_type(ProcessIO::DATA);
+    message->mutable_data()->set_type(ProcessIO::Data::STDIN);
+    message->mutable_data()->set_data(dataChunk);
+
+    writer.write(encoder.encode(call));
+  }
+
+  writer.close();
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+  AWAIT_READY(connection.disconnect());
+  AWAIT_READY(connection.disconnected());
+
+  // Closing the write end of `stdoutPipe`
+  // will trigger the switchboard to exit.
+  os::close(stdoutPipe[1]);
+  AWAIT_ASSERT_READY(runServer);
+
+  os::close(stdoutPipe[0]);
+  os::close(nullFd.get());
+  os::close(stdinFd.get());
+
+  Try<string> stdinData = os::read(stdinPath);
+  ASSERT_SOME(stdinData);
+
+  EXPECT_EQ(data, stdinData.get());
+}
+
+
 class IOSwitchboardTest
   : public ContainerizerTest<slave::MesosContainerizer> {};
 
@@ -376,6 +507,7 @@ TEST_F(IOSwitchboardTest, OutputRedirectionWithTTY)
 
   EXPECT_SOME_EQ("HelloWorld", os::read(path::join(directory.get(), "stdout")));
 }
+
 #endif // __WINDOWS__
 
 } // namespace tests {