You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/12/16 18:38:41 UTC

[1/2] mesos git commit: Refactored IOSwitchboardServerTest.AttachOutput test.

Repository: mesos
Updated Branches:
  refs/heads/master 28eaa8df7 -> 929c3c7b6


Refactored IOSwitchboardServerTest.AttachOutput test.

Pulled out some code into helpers that can be re-used in other tests.
I plan to use them in subsequent patches.

Review: https://reviews.apache.org/r/54727


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/929c3c7b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/929c3c7b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/929c3c7b

Branch: refs/heads/master
Commit: 929c3c7b6cd4681ac1d1d1f4d2c27c86318c1f7d
Parents: 8b09da9
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Dec 13 16:43:31 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 16 13:38:19 2016 -0500

----------------------------------------------------------------------
 .../containerizer/io_switchboard_tests.cpp      | 144 +++++++++++--------
 1 file changed, 87 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/929c3c7b/src/tests/containerizer/io_switchboard_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index 2718071..930f98f 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -16,6 +16,7 @@
 
 #include <map>
 #include <string>
+#include <tuple>
 #include <vector>
 
 #include <process/clock.hpp>
@@ -78,6 +79,7 @@ using testing::Eq;
 
 using std::map;
 using std::string;
+using std::tuple;
 using std::vector;
 
 namespace mesos {
@@ -85,7 +87,77 @@ namespace internal {
 namespace tests {
 
 #ifndef __WINDOWS__
-class IOSwitchboardServerTest : public TemporaryDirectoryTest {};
+class IOSwitchboardServerTest : public TemporaryDirectoryTest
+{
+protected:
+  // Helper that sends `ATTACH_CONTAINER_OUTPUT` request on the given
+  // `connection` and returns the response.
+  //
+  // TODO(vinod): Make this function more generic (e.g., sends any `Call`).
+  Future<http::Response> attachOutput(
+      const ContainerID& containerId,
+      http::Connection connection)
+  {
+    Call call;
+    call.set_type(Call::ATTACH_CONTAINER_OUTPUT);
+
+    Call::AttachContainerOutput* attach =
+      call.mutable_attach_container_output();
+
+    attach->mutable_container_id()->CopyFrom(containerId);
+
+    http::Request request;
+    request.method = "POST";
+    request.url.domain = "";
+    request.url.path = "/";
+    request.keepAlive = true;
+    request.headers["Accept"] = APPLICATION_STREAMING_JSON;
+    request.headers["Content-Type"] = APPLICATION_JSON;
+    request.body = stringify(JSON::protobuf(call));
+
+    return connection.send(request, true);
+  }
+
+  // Reads `ProcessIO::Data` records from the pipe `reader` until EOF is reached
+  // and returns the merged stdout and stderr.
+  // NOTE: It ignores any `ProcessIO::Control` records.
+  //
+  // TODO(vinod): Merge this with the identically named helper in api_tests.cpp.
+  Future<tuple<string, string>> getProcessIOData(http::Pipe::Reader reader)
+  {
+    return reader.readAll()
+      .then([](const string& data) -> Future<tuple<string, string>> {
+        string stdoutReceived;
+        string stderrReceived;
+
+        ::recordio::Decoder<agent::ProcessIO> decoder(lambda::bind(
+            deserialize<agent::ProcessIO>, ContentType::JSON, lambda::_1));
+
+        Try<std::deque<Try<agent::ProcessIO>>> records = decoder.decode(data);
+
+        if (records.isError()) {
+          return process::Failure(records.error());
+        }
+
+        while(!records->empty()) {
+          Try<agent::ProcessIO> record = records->front();
+          records->pop_front();
+
+          if (record.isError()) {
+            return process::Failure(record.error());
+          }
+
+          if (record->data().type() == agent::ProcessIO::Data::STDOUT) {
+            stdoutReceived += record->data().data();
+          } else if (record->data().type() == agent::ProcessIO::Data::STDERR) {
+            stderrReceived += record->data().data();
+          }
+        }
+
+        return std::make_tuple(stdoutReceived, stderrReceived);
+      });
+  }
+};
 
 
 TEST_F(IOSwitchboardServerTest, RedirectLog)
@@ -244,76 +316,37 @@ TEST_F(IOSwitchboardServerTest, AttachOutput)
 
   Future<Nothing> runServer = server.get()->run();
 
-  Call call;
-  call.set_type(Call::ATTACH_CONTAINER_OUTPUT);
-
-  Call::AttachContainerOutput* attach = call.mutable_attach_container_output();
-  attach->mutable_container_id()->set_value(UUID::random().toString());
-
-  http::Request request;
-  request.method = "POST";
-  request.url.domain = "";
-  request.url.path = "/";
-  request.keepAlive = true;
-  request.headers["Accept"] = APPLICATION_STREAMING_JSON;
-  request.headers["Content-Type"] = APPLICATION_JSON;
-  request.body = stringify(JSON::protobuf(call));
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
   Try<unix::Address> address = unix::Address::create(socketPath);
   ASSERT_SOME(address);
 
-  Future<http::Connection> _connection = http::connect(
-      address.get(), http::Scheme::HTTP);
+  Future<http::Connection> _connection =
+    http::connect(address.get(), http::Scheme::HTTP);
 
   AWAIT_READY(_connection);
   http::Connection connection = _connection.get();
 
-  Future<http::Response> response = connection.send(request, true);
+  Future<http::Response> response = attachOutput(containerId, connection);
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
   AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
   ASSERT_EQ(http::Response::PIPE, response.get().type);
+  ASSERT_SOME(response.get().reader);
 
-  Option<http::Pipe::Reader> reader = response.get().reader;
-  ASSERT_SOME(reader);
-
-  auto deserializer = [](const string& body) {
-    Try<JSON::Value> value = JSON::parse(body);
-    Try<agent::ProcessIO> parse =
-      ::protobuf::parse<agent::ProcessIO>(value.get());
-    return parse;
-  };
-
-  recordio::Reader<agent::ProcessIO> responseDecoder(
-      ::recordio::Decoder<agent::ProcessIO>(deserializer),
-      reader.get());
-
-  string stdoutReceived = "";
-  string stderrReceived = "";
-
-  while (true) {
-    Future<Result<agent::ProcessIO>> _message = responseDecoder.read();
-    AWAIT_READY(_message);
-
-    if (_message->isNone()) {
-      break;
-    }
+  Future<tuple<string, string>> received =
+    getProcessIOData(response->reader.get());
 
-    ASSERT_SOME(_message.get());
-
-    agent::ProcessIO message = _message.get().get();
+  AWAIT_READY(received);
 
-    ASSERT_EQ(agent::ProcessIO::DATA, message.type());
+  string stdoutReceived;
+  string stderrReceived;
 
-    ASSERT_TRUE(message.data().type() == agent::ProcessIO::Data::STDOUT ||
-                message.data().type() == agent::ProcessIO::Data::STDERR);
+  tie(stdoutReceived, stderrReceived) = received.get();
 
-    if (message.data().type() == agent::ProcessIO::Data::STDOUT) {
-      stdoutReceived += message.data().data();
-    } else if (message.data().type() == agent::ProcessIO::Data::STDERR) {
-      stderrReceived += message.data().data();
-    }
-  }
+  EXPECT_EQ(data, stdoutReceived);
+  EXPECT_EQ(data, stderrReceived);
 
   AWAIT_READY(connection.disconnect());
   AWAIT_READY(connection.disconnected());
@@ -323,9 +356,6 @@ TEST_F(IOSwitchboardServerTest, AttachOutput)
   os::close(nullFd.get());
   os::close(stdoutFd.get());
   os::close(stderrFd.get());
-
-  EXPECT_EQ(data, stdoutReceived);
-  EXPECT_EQ(data, stderrReceived);
 }
 
 


[2/2] mesos git commit: Added a test that verifies container attach after agent restart.

Posted by vi...@apache.org.
Added a test that verifies container attach after agent restart.

Review: https://reviews.apache.org/r/54720


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b09da9a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b09da9a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b09da9a

Branch: refs/heads/master
Commit: 8b09da9aa6f6ed31c4fd727e546eb10afc2b7a34
Parents: 28eaa8d
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Dec 12 16:42:19 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 16 13:38:19 2016 -0500

----------------------------------------------------------------------
 .../containerizer/io_switchboard_tests.cpp      | 108 +++++++++++++++++++
 1 file changed, 108 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8b09da9a/src/tests/containerizer/io_switchboard_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index 074c32b..2718071 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -1033,6 +1033,114 @@ TEST_F(IOSwitchboardTest, RecoverThenKillSwitchboardContainerDestroyed)
   driver.stop();
   driver.join();
 }
+
+
+// This test verifies that a container can be attached after a slave restart.
+TEST_F(IOSwitchboardTest, ContainerAttachAfterSlaveRestart)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.launcher = "posix";
+  flags.isolation = "posix/cpu";
+#ifdef __linux__
+  flags.agent_subsystems = None();
+#endif
+
+  Fetcher fetcher;
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<MesosContainerizer*> create = MesosContainerizer::create(
+      flags,
+      false,
+      &fetcher);
+
+  ASSERT_SOME(create);
+
+  Owned<MesosContainerizer> containerizer(create.get());
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(
+      detector.get(),
+      containerizer.get(),
+      flags);
+
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&statusRunning));
+
+  Future<Nothing> _ack =
+    FUTURE_DISPATCH(_, &slave::Slave::_statusUpdateAcknowledgement);
+
+  // Launch a task with tty to start the switchboard server.
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+  task.mutable_container()->set_type(ContainerInfo::MESOS);
+  task.mutable_container()->mutable_tty_info();
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(statusRunning);
+
+  // Wait for the ACK to be checkpointed.
+  AWAIT_READY(_ack);
+
+  // Restart the slave with a new containerizer.
+  slave.get()->terminate();
+
+  Future<Nothing> _recover = FUTURE_DISPATCH(_, &slave::Slave::_recover);
+
+  create = MesosContainerizer::create(
+      flags,
+      false,
+      &fetcher);
+
+  ASSERT_SOME(create);
+
+  containerizer.reset(create.get());
+
+  slave = StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Wait until containerizer is recovered.
+  AWAIT_READY(_recover);
+
+  Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(1u, containers.get().size());
+
+  ContainerID containerId;
+  containerId.set_value(containers->begin()->value());
+
+  Future<http::Connection> connection = containerizer->attach(containerId);
+  AWAIT_READY(connection);
+
+  driver.stop();
+  driver.join();
+}
+
 #endif // __WINDOWS__
 
 } // namespace tests {