You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/09/18 18:12:59 UTC

[mesos] branch 1.5.x updated (5a7ad47 -> d51ec24)

This is an automated email from the ASF dual-hosted git repository.

alexr pushed a change to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 5a7ad47  Added MESOS-8871 to the 1.5.2 CHANGELOG.
     new 5a50899  Fixed IOSwitchboard waiting EOF from attach container input request.
     new 25de607  Added `AgentAPITest.LaunchNestedContainerSessionKillTask` test.
     new fa6eb85  Added `AgentAPITest.AttachContainerInputRepeat` test.
     new 3bf4fe2  Fixed HTTP errors caused by dropped HTTP responses by IOSwitchboard.
     new 33a6bec  Fixed broken pipe error in IOSwitchboard.
     new d51ec24  Added MESOS-8545 and MESOS-9131 to the 1.5.2 CHANGELOG.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG                                        |   2 +
 src/slave/containerizer/mesos/io/switchboard.cpp | 105 +++--
 src/slave/http.cpp                               |  28 +-
 src/slave/http.hpp                               |   3 +
 src/tests/api_tests.cpp                          | 466 +++++++++++++++++++++++
 src/tests/containerizer/io_switchboard_tests.cpp |  17 +
 6 files changed, 593 insertions(+), 28 deletions(-)


[mesos] 03/06: Added `AgentAPITest.AttachContainerInputRepeat` test.

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit fa6eb85fd2a8798842855628495c16664bc68652
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:10:07 2018 +0200

    Added `AgentAPITest.AttachContainerInputRepeat` test.
    
    This test verifies that we can call `ATTACH_CONTAINER_INPUT` more
    than once. We send a short message first then we send a long message
    in chunks.
    
    Review: https://reviews.apache.org/r/68231/
    (cherry picked from commit 7ad390b3aa261f4a39ff7f2c0842f2aae39005f4)
---
 src/tests/api_tests.cpp | 218 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 218 insertions(+)

diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index a3b2690..0144033 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -5950,6 +5950,224 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
 }
 
 
+// This test verifies that we can call `ATTACH_CONTAINER_INPUT` more than once.
+// We send a short message first, then we send a long message by chunks.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat)
+{
+  const ContentType contentType = GetParam();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  Fetcher fetcher(flags);
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), flags);
+
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_STARTING, status->state());
+
+  Future<hashset<ContainerID>> containerIds = containerizer->containers();
+  AWAIT_READY(containerIds);
+  ASSERT_EQ(1u, containerIds->size());
+
+  // Launch a nested container session that runs `cat`.
+  v1::ContainerID containerId;
+  containerId.set_value(id::UUID::random().toString());
+  containerId.mutable_parent()->set_value(containerIds->begin()->value());
+
+  v1::agent::Call call;
+  call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+  call.mutable_launch_nested_container_session()->mutable_container_id()
+    ->CopyFrom(containerId);
+
+  call.mutable_launch_nested_container_session()->mutable_command()->set_value(
+      "cat");
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> response = http::streaming::post(
+    slave.get()->pid,
+    "api/v1",
+    headers,
+    serialize(contentType, call),
+    stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  ASSERT_EQ(stringify(contentType), response->headers.at("Content-Type"));
+  ASSERT_NONE(response->headers.get(MESSAGE_CONTENT_TYPE));
+  ASSERT_EQ(http::Response::PIPE, response->type);
+
+  auto attachContainerInput = [&](const std::string& data, bool sendEOF) {
+    http::Pipe pipe;
+    http::Pipe::Writer writer = pipe.writer();
+    http::Pipe::Reader reader = pipe.reader();
+
+    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
+        serialize, contentType, lambda::_1));
+
+    {
+      v1::agent::Call call;
+      call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+      v1::agent::Call::AttachContainerInput* attach =
+        call.mutable_attach_container_input();
+
+      attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
+      attach->mutable_container_id()->CopyFrom(containerId);
+
+      writer.write(encoder.encode(call));
+    }
+
+    size_t offset = 0;
+    size_t chunkSize = 4096;
+    while (offset < data.length()) {
+      string dataChunk = data.substr(offset, chunkSize);
+      offset += chunkSize;
+
+      v1::agent::Call call;
+      call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+      v1::agent::Call::AttachContainerInput* attach =
+        call.mutable_attach_container_input();
+
+      attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+      v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+      processIO->set_type(v1::agent::ProcessIO::DATA);
+      processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+      processIO->mutable_data()->set_data(dataChunk);
+
+      writer.write(encoder.encode(call));
+    }
+
+    // Signal `EOF` to the 'cat' command.
+    if (sendEOF) {
+      v1::agent::Call call;
+      call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+      v1::agent::Call::AttachContainerInput* attach =
+        call.mutable_attach_container_input();
+
+      attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+      v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+      processIO->set_type(v1::agent::ProcessIO::DATA);
+      processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+      processIO->mutable_data()->set_data("");
+
+      writer.write(encoder.encode(call));
+    }
+
+    writer.close();
+
+    // TODO(anand): Add a `post()` overload that handles request streaming.
+    {
+      http::URL agent = http::URL(
+          "http",
+          slave.get()->pid.address.ip,
+          slave.get()->pid.address.port,
+          slave.get()->pid.id +
+          "/api/v1");
+
+      Future<http::Connection> _connection = http::connect(agent);
+      AWAIT_READY(_connection);
+
+      http::Connection connection = _connection.get(); // Remove const.
+
+      http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+      headers["Content-Type"] = stringify(ContentType::RECORDIO);
+      headers[MESSAGE_CONTENT_TYPE] = stringify(contentType);
+
+      http::Request request;
+      request.url = agent;
+      request.method = "POST";
+      request.type = http::Request::PIPE;
+      request.reader = reader;
+      request.headers = headers;
+
+      Future<http::Response> response = connection.send(request);
+      AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+    }
+  };
+
+  // Prepare the data to send to `cat` and send it over an
+  // `ATTACH_CONTAINER_INPUT` stream.
+  string data1 = "Hello, World!";
+  string data2 =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+    "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+    "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+    "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+    "culpa qui officia deserunt mollit anim id est laborum.";
+
+  while (Bytes(data2.size()) < Megabytes(1)) {
+    data2.append(data2);
+  }
+
+  attachContainerInput(data1, false);
+  attachContainerInput(data2, true);
+
+  ASSERT_SOME(response->reader);
+  Future<tuple<string, string>> received =
+    getProcessIOData(contentType, response->reader.get());
+
+  AWAIT_READY(received);
+
+  string stdoutReceived;
+  string stderrReceived;
+
+  tie(stdoutReceived, stderrReceived) = received.get();
+
+  EXPECT_EQ(stdoutReceived, data1 + data2);
+
+  ASSERT_TRUE(stderrReceived.empty());
+
+  driver.stop();
+  driver.join();
+}
+
+
 // This test verifies that attaching to the output of a container fails if the
 // containerizer doesn't support the operation.
 TEST_P(AgentAPITest, AttachContainerOutputFailure)


[mesos] 05/06: Fixed broken pipe error in IOSwitchboard.

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 33a6bec95b44592d626874ae8deaa3e2a3bbc120
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:10:20 2018 +0200

    Fixed broken pipe error in IOSwitchboard.
    
    We force IOSwitchboard to return a final response to the client for the
    `ATTACH_CONTAINER_INPUT` call after IO redirects are finished. In this
    case, we don't read remaining messages from the input stream. So the
    agent might send an acknowledgment for the request before IOSwitchboard
    has received remaining messages. We need to delay termination of
    IOSwitchboard to give it a chance to read the remaining messages.
    Otherwise, the agent might get `HTTP 500` "broken pipe" while
    attempting to write the final message.
    
    Review: https://reviews.apache.org/r/62187/
    (cherry picked from commit c5cf4d49f47579b5a6cb7afc2f7df7c8f51dc6d0)
---
 src/slave/containerizer/mesos/io/switchboard.cpp | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 0b28022..7db7c3e 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -1601,9 +1601,14 @@ IOSwitchboardServerProcess::acknowledgeContainerInputResponse()
   if (--numPendingAcknowledgments == 0) {
     // If IO redirects are finished or writing to `stdin` failed we want to
     // terminate ourselves (after flushing any outstanding messages from our
-    // message queue).
+    // message queue). Since IOSwitchboard might receive an acknowledgment for
+    // the `ATTACH_CONTAINER_INPUT` request before reading a final message from
+    // the corresponding connection, we need to delay our termination to give
+    // IOSwitchboard a chance to read the final message. Otherwise, the agent
+    // might get `HTTP 500` "broken pipe" while attempting to write the final
+    // message.
     if (!redirectFinished.future().isPending() || failure.isSome()) {
-      terminate(self(), false);
+      after(Seconds(1)).onAny([=]() { terminate(self(), false); });
     }
   }
   return http::OK();


[mesos] 06/06: Added MESOS-8545 and MESOS-9131 to the 1.5.2 CHANGELOG.

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d51ec24eaf6527aa6d0d024c3905ec7060363472
Author: Alexander Rukletsov <al...@apache.org>
AuthorDate: Tue Sep 18 20:12:46 2018 +0200

    Added MESOS-8545 and MESOS-9131 to the 1.5.2 CHANGELOG.
---
 CHANGELOG | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/CHANGELOG b/CHANGELOG
index 02c8fa2..51428f6 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -5,6 +5,7 @@ Release Notes - Mesos - Version 1.5.2 (WIP)
 ** Bug
   * [MESOS-3790] - ZooKeeper connection should retry on `EAI_NONAME`.
   * [MESOS-8418] - mesos-agent high cpu usage because of numerous /proc/mounts reads.
+  * [MESOS-8545] - AgentAPIStreamingTest.AttachInputToNestedContainerSession is flaky.
   * [MESOS-8568] - Command checks should always call `WAIT_NESTED_CONTAINER` before `REMOVE_NESTED_CONTAINER`
   * [MESOS-8830] - Agent gc on old slave sandboxes could empty persistent volume data
   * [MESOS-8871] - Agent may fail to recover if the agent dies before image store cache checkpointed.
@@ -27,6 +28,7 @@ Release Notes - Mesos - Version 1.5.2 (WIP)
   * [MESOS-9116] - Launch nested container session fails due to incorrect detection of `mnt` namespace of command executor's task.
   * [MESOS-9125] - Port mapper CNI plugin might fail with "Resource temporarily unavailable"
   * [MESOS-9127] - Port mapper CNI plugin might deadlock iptables on the agent.
+  * [MESOS-9131] - Health checks launching nested containers while a container is being destroyed lead to unkillable tasks.
   * [MESOS-9142] - CNI detach might fail due to missing network config file.
   * [MESOS-9144] - Master authentication handling leads to request amplification.
   * [MESOS-9145] - Master has a fragile burned-in 5s authentication timeout.


[mesos] 02/06: Added `AgentAPITest.LaunchNestedContainerSessionKillTask` test.

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 25de60746de4681ed0d858cba0790372f03ff840
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:10:01 2018 +0200

    Added `AgentAPITest.LaunchNestedContainerSessionKillTask` test.
    
    This test verifies that IOSwitchboard, which holds an open HTTP input
    connection, terminates once IO redirects finish for the corresponding
    nested container.
    
    Review: https://reviews.apache.org/r/68230/
    (cherry picked from commit e941d206f651bde861675a6517a89e44d1f61a34)
---
 src/tests/api_tests.cpp | 248 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 248 insertions(+)

diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 946964a..a3b2690 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -5702,6 +5702,254 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
 }
 
 
+// This test verifies that IOSwitchboard, which holds an open HTTP input
+// connection, terminates once IO redirects finish for the corresponding
+// nested container:
+//   1. Launches a parent container `sleep 1000` via the default executor.
+//   2. Launches "sh" as a nested container session.
+//   3. Attaches to nested container's input via `ATTACH_CONTAINER_INPUT`
+//      call to send "kill `pgrep sleep`" command into "sh" which kills
+//      the parent container.
+//   4. Check that all containers have been terminated.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(
+    AgentAPITest,
+    ROOT_CGROUPS_LaunchNestedContainerSessionKillTask)
+{
+  const ContentType contentType = GetParam();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "filesystem/linux,namespaces/pid";
+
+  Fetcher fetcher(flags);
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), flags);
+
+  ASSERT_SOME(slave);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      contentType,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Future<v1::scheduler::Event::Update> updateStarting;
+  Future<v1::scheduler::Event::Update> updateRunning;
+  Future<v1::scheduler::Event::Update> updateFailed;
+
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(
+      DoAll(
+          FutureArg<1>(&updateStarting),
+          v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+      DoAll(
+          FutureArg<1>(&updateRunning),
+          v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+      DoAll(
+          FutureArg<1>(&updateFailed),
+          v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  v1::Resources resources =
+    v1::Resources::parse(defaultTaskResourcesString).get();
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+
+  AWAIT_READY(updateStarting);
+  ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
+  ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
+  AWAIT_READY(updateRunning);
+  ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
+  ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
+  ASSERT_TRUE(updateRunning->status().has_container_status());
+
+  v1::ContainerStatus status = updateRunning->status().container_status();
+
+  ASSERT_TRUE(status.has_container_id());
+  EXPECT_TRUE(status.container_id().has_parent());
+
+  // Launch "sh" command via `LAUNCH_NESTED_CONTAINER_SESSION` call.
+  v1::ContainerID containerId;
+  containerId.mutable_parent()->CopyFrom(status.container_id());
+  containerId.set_value(id::UUID::random().toString());
+
+  Future<http::Response> sessionResponse;
+
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+    call.mutable_launch_nested_container_session()->mutable_container_id()
+      ->CopyFrom(containerId);
+
+    call.mutable_launch_nested_container_session()->mutable_command()
+      ->CopyFrom(v1::createCommandInfo("sh"));
+
+    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(ContentType::RECORDIO);
+    headers[MESSAGE_ACCEPT] = stringify(contentType);
+
+    sessionResponse = http::streaming::post(
+        slave.get()->pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, sessionResponse);
+  }
+
+  // Send "pkill sleep" to "sh" via `ATTACH_CONTAINER_INPUT` call.
+  // Note, that we do not close input connection because we want to emulate
+  // user's interactive debug session.
+  http::Pipe pipe;
+  http::Pipe::Writer writer = pipe.writer();
+  http::Pipe::Reader reader = pipe.reader();
+
+  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
+      serialize, contentType, lambda::_1));
+
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+    v1::agent::Call::AttachContainerInput* attach =
+      call.mutable_attach_container_input();
+
+    attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
+    attach->mutable_container_id()->CopyFrom(containerId);
+
+    writer.write(encoder.encode(call));
+  }
+
+  const std::string command = "pkill sleep\n";
+
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
+
+    v1::agent::Call::AttachContainerInput* attach =
+      call.mutable_attach_container_input();
+
+    attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
+
+    v1::agent::ProcessIO* processIO = attach->mutable_process_io();
+    processIO->set_type(v1::agent::ProcessIO::DATA);
+    processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
+    processIO->mutable_data()->set_data(command);
+
+    writer.write(encoder.encode(call));
+  }
+
+  {
+    // TODO(anand): Add a `post()` overload that handles request streaming.
+    http::URL agent = http::URL(
+        "http",
+        slave.get()->pid.address.ip,
+        slave.get()->pid.address.port,
+        slave.get()->pid.id +
+        "/api/v1");
+
+    Future<http::Connection> _connection = http::connect(agent);
+    AWAIT_READY(_connection);
+
+    http::Connection connection = _connection.get(); // Remove const.
+
+    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Content-Type"] = stringify(ContentType::RECORDIO);
+    headers[MESSAGE_CONTENT_TYPE] = stringify(contentType);
+
+    http::Request request;
+    request.url = agent;
+    request.method = "POST";
+    request.type = http::Request::PIPE;
+    request.reader = reader;
+    request.headers = headers;
+
+    Future<http::Response> response = connection.send(request);
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  }
+
+  // Read the output from the LAUNCH_NESTED_CONTAINER_SESSION.
+  ASSERT_SOME(sessionResponse->reader);
+
+  Option<http::Pipe::Reader> output = sessionResponse->reader.get();
+  ASSERT_SOME(output);
+
+  Future<tuple<string, string>> received =
+    getProcessIOData(contentType, output.get());
+
+  AWAIT_READY(received);
+
+  string stdoutReceived;
+  string stderrReceived;
+
+  tie(stdoutReceived, stderrReceived) = received.get();
+
+  ASSERT_TRUE(stdoutReceived.empty());
+  ASSERT_TRUE(stderrReceived.empty());
+
+  // Check terminal status update of the task.
+  AWAIT_READY(updateFailed);
+
+  ASSERT_EQ(v1::TASK_FAILED, updateFailed->status().state());
+  ASSERT_EQ(taskInfo.task_id(), updateFailed->status().task_id());
+}
+
+
 // This test verifies that attaching to the output of a container fails if the
 // containerizer doesn't support the operation.
 TEST_P(AgentAPITest, AttachContainerOutputFailure)


[mesos] 01/06: Fixed IOSwitchboard waiting EOF from attach container input request.

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 5a5089938f13a5aafc0a4ee3308f33e76374c408
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:09:31 2018 +0200

    Fixed IOSwitchboard waiting EOF from attach container input request.
    
    Previously, when a corresponding nested container terminated, while the
    user was attached to the container's stdin via `ATTACH_CONTAINER_INPUT`
    IOSwitchboard didn't terminate immediately. IOSwitchboard was waiting
    for EOF message from the input HTTP connection. Since the IOSwitchboard
    was stuck, the corresponding nested container was also stuck in
    `DESTROYING` state.
    
    This patch fixes the aforementioned issue by sending 200 `OK` response
    for `ATTACH_CONTAINER_INPUT` call in the case when io redirect is
    finished while reading from the HTTP input connection is not.
    
    Review: https://reviews.apache.org/r/68232/
    (cherry picked from commit 2fdc8f3cffc5eac91e5f2b0c6aef2254acfc2bd0)
---
 src/slave/containerizer/mesos/io/switchboard.cpp | 74 +++++++++++++++---------
 1 file changed, 48 insertions(+), 26 deletions(-)

diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 6d167f5..e76d3bd 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -1017,10 +1017,11 @@ private:
   bool waitForConnection;
   Option<Duration> heartbeatInterval;
   bool inputConnected;
-  bool redirectFinished;  // Set when both stdout and stderr redirects finish.
   Future<unix::Socket> accept;
   Promise<Nothing> promise;
   Promise<Nothing> startRedirect;
+  // Set when both stdout and stderr redirects finish.
+  Promise<http::Response> redirectFinished;
   // The following must be a `std::list`
   // for proper erase semantics later on.
   list<HttpConnection> outputConnections;
@@ -1151,8 +1152,7 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
     socket(_socket),
     waitForConnection(_waitForConnection),
     heartbeatInterval(_heartbeatInterval),
-    inputConnected(false),
-    redirectFinished(false) {}
+    inputConnected(false) {}
 
 
 Future<Nothing> IOSwitchboardServerProcess::run()
@@ -1210,10 +1210,11 @@ Future<Nothing> IOSwitchboardServerProcess::run()
       // switchboard process early.
       //
       // If our IO redirects are finished and there is an input connected,
-      // then we postpone our termination until either a container closes
-      // its `stdin` or a client closes the input connection so that we can
-      // guarantee returning a http response for `ATTACH_CONTAINER_INPUT`
-      // request before terminating ourselves.
+      // then we set `redirectFinished` promise which triggers a callback for
+      // `attachContainerInput()`. This callback returns a final `HTTP 200`
+      // response to the client, even if the client has not yet sent the EOF
+      // message. So we postpone our termination until we send a final
+      // response to the client.
       //
       // NOTE: We always call `terminate()` with `false` to ensure
       // that our event queue is drained before actually terminating.
@@ -1244,9 +1245,9 @@ Future<Nothing> IOSwitchboardServerProcess::run()
 
       collect(stdoutRedirect, stderrRedirect)
         .then(defer(self(), [this]() {
-          redirectFinished = true;
-
-          if (!inputConnected) {
+          if (inputConnected) {
+            redirectFinished.set(http::OK());
+          } else {
             terminate(self(), false);
           }
           return Nothing();
@@ -1594,7 +1595,7 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
 
   // Loop through each record and process it. Return a proper
   // response once the last record has been fully processed.
-  return loop(
+  auto readLoop = loop(
       self(),
       [=]() {
         return reader->read();
@@ -1685,22 +1686,43 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
             UNREACHABLE();
           }
         }
-      })
-    // We explicitly specify the return type to avoid a type deduction
-    // issue in some versions of clang. See MESOS-2943.
-    .then(defer(self(), [=](const http::Response& response) -> http::Response {
-      // Reset `inputConnected` to allow future input connections.
-      inputConnected = false;
-
-      // If IO redirects are finished or writing to `stdin` failed we want
-      // to terminate ourselves (after flushing any outstanding messages
-      // from our message queue).
-      if (redirectFinished || failure.isSome()) {
-        terminate(self(), false);
-      }
+      });
 
-      return response;
-    }));
+  // We create a new promise, which is transitioned to `READY` when either
+  // the read loop finishes or IO redirects finish. Once this promise is set,
+  // we return a final response to the client.
+  //
+  // TODO(abudnik): Ideally, we would have used `process::select()` to capture a
+  // transition into a terminal state for any of `{readLoop, redirectFinished}`.
+  // However, `select()` currently does not capture a future that has failed.
+  // Another alternative would be to allow `promise::associate()` to accept
+  // multiple source futures.
+  Owned<Promise<http::Response>> promise(new Promise<http::Response>());
+
+  auto setPromise = [promise](const Future<http::Response>& response) {
+    promise->set(response);
+  };
+
+  readLoop.onAny(defer(self(), setPromise));
+
+  redirectFinished.future().onAny(defer(self(), setPromise));
+
+  // We explicitly specify the return type to avoid a type deduction
+  // issue in some versions of clang. See MESOS-2943.
+  return promise->future().then(
+      defer(self(), [=](const http::Response& response) -> http::Response {
+        // Reset `inputConnected` to allow future input connections.
+        inputConnected = false;
+
+        // If IO redirects are finished or writing to `stdin` failed we want
+        // to terminate ourselves (after flushing any outstanding messages
+        // from our message queue).
+        if (!redirectFinished.future().isPending() || failure.isSome()) {
+          terminate(self(), false);
+        }
+
+        return response;
+      }));
 }
 
 


[mesos] 04/06: Fixed HTTP errors caused by dropped HTTP responses by IOSwitchboard.

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexr pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3bf4fe22e0ed828a36d5b2ea652d07c6eef4b578
Author: Andrei Budnik <ab...@mesosphere.com>
AuthorDate: Tue Sep 18 19:10:14 2018 +0200

    Fixed HTTP errors caused by dropped HTTP responses by IOSwitchboard.
    
    Previously, IOSwitchboard process could terminate before all HTTP
    responses had been sent to the agent. In the case of
    `ATTACH_CONTAINER_INPUT` call, we could drop a final HTTP `200 OK`
    response, so the agent got broken HTTP connection for the call.
    This patch introduces an acknowledgment for the received response
    for the `ATTACH_CONTAINER_INPUT` call. This acknowledgment is a new
    type of control messages for the `ATTACH_CONTAINER_INPUT` call. When
    IOSwitchboard receives an acknowledgment, and io redirects are
    finished, it terminates itself. That guarantees that the agent always
    receives a response for the `ATTACH_CONTAINER_INPUT` call.
    
    Review: https://reviews.apache.org/r/65168/
    (cherry picked from commit 5b95bb0f21852058d22703385f2c8e139881bf1a)
---
 src/slave/containerizer/mesos/io/switchboard.cpp | 52 ++++++++++++++++++------
 src/slave/http.cpp                               | 28 ++++++++++++-
 src/slave/http.hpp                               |  3 ++
 src/tests/containerizer/io_switchboard_tests.cpp | 17 ++++++++
 4 files changed, 85 insertions(+), 15 deletions(-)

diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index e76d3bd..0b28022 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -992,6 +992,9 @@ private:
   // switchboard.
   Option<Error> validate(const agent::Call::AttachContainerInput& call);
 
+  // Handle acknowledgment for `ATTACH_CONTAINER_INPUT` call.
+  Future<http::Response> acknowledgeContainerInputResponse();
+
   // Handle `ATTACH_CONTAINER_INPUT` calls.
   Future<http::Response> attachContainerInput(
       const Owned<recordio::Reader<agent::Call>>& reader);
@@ -1017,6 +1020,10 @@ private:
   bool waitForConnection;
   Option<Duration> heartbeatInterval;
   bool inputConnected;
+  // Each time the agent receives a response for `ATTACH_CONTAINER_INPUT`
+  // request it sends an acknowledgment. This counter is used to delay
+  // IOSwitchboard termination until all acknowledgments are received.
+  size_t numPendingAcknowledgments;
   Future<unix::Socket> accept;
   Promise<Nothing> promise;
   Promise<Nothing> startRedirect;
@@ -1152,7 +1159,8 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
     socket(_socket),
     waitForConnection(_waitForConnection),
     heartbeatInterval(_heartbeatInterval),
-    inputConnected(false) {}
+    inputConnected(false),
+    numPendingAcknowledgments(0) {}
 
 
 Future<Nothing> IOSwitchboardServerProcess::run()
@@ -1209,12 +1217,12 @@ Future<Nothing> IOSwitchboardServerProcess::run()
       // containers with this behavior and we will exit out of the
       // switchboard process early.
       //
-      // If our IO redirects are finished and there is an input connected,
-      // then we set `redirectFinished` promise which triggers a callback for
+      // If our IO redirects are finished and there are pending
+      // acknowledgments for `ATTACH_CONTAINER_INPUT` requests, then
+      // we set `redirectFinished` promise which triggers a callback for
       // `attachContainerInput()`. This callback returns a final `HTTP 200`
       // response to the client, even if the client has not yet sent the EOF
-      // message. So we postpone our termination until we send a final
-      // response to the client.
+      // message.
       //
       // NOTE: We always call `terminate()` with `false` to ensure
       // that our event queue is drained before actually terminating.
@@ -1245,7 +1253,7 @@ Future<Nothing> IOSwitchboardServerProcess::run()
 
       collect(stdoutRedirect, stderrRedirect)
         .then(defer(self(), [this]() {
-          if (inputConnected) {
+          if (numPendingAcknowledgments > 0) {
             redirectFinished.set(http::OK());
           } else {
             terminate(self(), false);
@@ -1353,6 +1361,10 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
 {
   CHECK_EQ("POST", request.method);
 
+  if (request.url.path == "/acknowledge_container_input_response") {
+    return acknowledgeContainerInputResponse();
+  }
+
   Option<string> contentType_ = request.headers.get("Content-Type");
   CHECK_SOME(contentType_);
 
@@ -1579,9 +1591,30 @@ Option<Error> IOSwitchboardServerProcess::validate(
 }
 
 
+Future<http::Response>
+IOSwitchboardServerProcess::acknowledgeContainerInputResponse()
+{
+  // Check if this is an acknowledgment sent by the agent. This acknowledgment
+  // means that response for `ATTACH_CONTAINER_INPUT` call has been received by
+  // the agent.
+  CHECK_GT(numPendingAcknowledgments, 0u);
+  if (--numPendingAcknowledgments == 0) {
+    // If IO redirects are finished or writing to `stdin` failed we want to
+    // terminate ourselves (after flushing any outstanding messages from our
+    // message queue).
+    if (!redirectFinished.future().isPending() || failure.isSome()) {
+      terminate(self(), false);
+    }
+  }
+  return http::OK();
+}
+
+
 Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
     const Owned<recordio::Reader<agent::Call>>& reader)
 {
+  ++numPendingAcknowledgments;
+
   // Only allow a single input connection at a time.
   if (inputConnected) {
     return http::Conflict("Multiple input connections are not allowed");
@@ -1714,13 +1747,6 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
         // Reset `inputConnected` to allow future input connections.
         inputConnected = false;
 
-        // If IO redirects are finished or writing to `stdin` failed we want
-        // to terminate ourselves (after flushing any outstanding messages
-        // from our message queue).
-        if (!redirectFinished.future().isPending() || failure.isSome()) {
-          terminate(self(), false);
-        }
-
         return response;
       }));
 }
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 3abcc1e..498f4f0 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -3194,8 +3194,7 @@ Future<Response> Http::_attachContainerInput(
       std::move(decoder), encoder, writer);
 
   return slave->containerizer->attach(containerId)
-    .then([mediaTypes, reader, writer, transform](
-        Connection connection) mutable {
+    .then([=](Connection connection) mutable {
       Request request;
       request.method = "POST";
       request.type = Request::PIPE;
@@ -3230,6 +3229,31 @@ Future<Response> Http::_attachContainerInput(
       connection.disconnected()
         .onAny([connection]() {});
 
+      return connection.send(request)
+        .onAny(defer(
+            slave->self(),
+            [=](const Future<Response>&) {
+              // After we have received a response for `ATTACH_CONTAINER_INPUT`
+              // call, we need to send an acknowledgment to the IOSwitchboard,
+              // so that the IOSwitchboard process can terminate itself. This is
+              // a workaround for the problem with dropping outstanding HTTP
+              // responses due to a lack of graceful shutdown in libprocess.
+              acknowledgeContainerInputResponse(containerId);
+            }));
+    });
+}
+
+
+Future<Response> Http::acknowledgeContainerInputResponse(
+    const ContainerID& containerId) const {
+  return slave->containerizer->attach(containerId)
+    .then([](Connection connection) {
+      Request request;
+      request.method = "POST";
+      request.type = Request::BODY;
+      request.url.domain = "";
+      request.url.path = "/acknowledge_container_input_response";
+
       return connection.send(request);
     });
 }
diff --git a/src/slave/http.hpp b/src/slave/http.hpp
index 1619bb7..fc13a9a 100644
--- a/src/slave/http.hpp
+++ b/src/slave/http.hpp
@@ -300,6 +300,9 @@ private:
       process::Owned<recordio::Reader<agent::Call>>&& decoder,
       const RequestMediaTypes& mediaTypes) const;
 
+  process::Future<process::http::Response> acknowledgeContainerInputResponse(
+      const ContainerID& containerId) const;
+
   process::Future<process::http::Response> attachContainerOutput(
       const mesos::agent::Call& call,
       const RequestMediaTypes& mediaTypes,
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index f217d02..eef6020 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -121,6 +121,19 @@ protected:
     return connection.send(request, true);
   }
 
+  // Helper that sends an acknowledgment for the `ATTACH_CONTAINER_INPUT`
+  // request.
+  Future<http::Response> acknowledgeContainerInputResponse(
+      http::Connection connection) const {
+    http::Request request;
+    request.method = "POST";
+    request.type = http::Request::BODY;
+    request.url.domain = "";
+    request.url.path = "/acknowledge_container_input_response";
+
+    return connection.send(request);
+  }
+
   // Reads `ProcessIO::Data` records from the pipe `reader` until EOF is reached
   // and returns the merged stdout and stderr.
   // NOTE: It ignores any `ProcessIO::Control` records.
@@ -578,6 +591,8 @@ TEST_F(IOSwitchboardServerTest, AttachInput)
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
 
+  acknowledgeContainerInputResponse(connection);
+
   AWAIT_READY(connection.disconnect());
   AWAIT_READY(connection.disconnected());
 
@@ -689,6 +704,8 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat)
   // result of receiving the heartbeats.
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
 
+  acknowledgeContainerInputResponse(connection);
+
   AWAIT_READY(connection.disconnect());
   AWAIT_READY(connection.disconnected());