You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2016/01/28 01:25:59 UTC

[1/2] mesos git commit: Fixed LogrotateContainerLogger's FD ownership.

Repository: mesos
Updated Branches:
  refs/heads/master 3558d5951 -> f3536fad9


Fixed LogrotateContainerLogger's FD ownership.

Changes the logrotate container logger to manually construct and deal
with pipes.  Specifically, both read and write ends of the pipe must
end up in the child processes (read -> logger executables, write ->
container).

If ownership is not transferred, the pipe's FDs may be closed (again)
when `Subprocess` is destructed, which may unexpectedly close random
FDs belonging to other threads.

Review: https://reviews.apache.org/r/42865/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f3536fad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f3536fad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f3536fad

Branch: refs/heads/master
Commit: f3536fad9dfb013a2b4718ad5bddb60148b14334
Parents: 256a053
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Wed Jan 27 16:25:40 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jan 27 16:25:57 2016 -0800

----------------------------------------------------------------------
 src/slave/container_loggers/lib_logrotate.cpp | 61 ++++++++++++++++++++--
 1 file changed, 57 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f3536fad/src/slave/container_loggers/lib_logrotate.cpp
----------------------------------------------------------------------
diff --git a/src/slave/container_loggers/lib_logrotate.cpp b/src/slave/container_loggers/lib_logrotate.cpp
index bfc7cad..8d2f895 100644
--- a/src/slave/container_loggers/lib_logrotate.cpp
+++ b/src/slave/container_loggers/lib_logrotate.cpp
@@ -38,6 +38,7 @@
 #include <stout/path.hpp>
 
 #include <stout/os/environment.hpp>
+#include <stout/os/fcntl.hpp>
 #include <stout/os/killtree.hpp>
 
 #include "slave/container_loggers/logrotate.hpp"
@@ -88,6 +89,31 @@ public:
     environment.erase("LIBPROCESS_ADVERTISE_IP");
     environment.erase("LIBPROCESS_ADVERTISE_PORT");
 
+    // NOTE: We manually construct a pipe here instead of using
+    // `Subprocess::PIPE` so that the ownership of the FDs is properly
+    // represented.  The `Subprocess` spawned below owns the read-end
+    // of the pipe and will be solely responsible for closing that end.
+    // The ownership of the write-end will be passed to the caller
+    // of this function.
+    int pipefd[2];
+    if (::pipe(pipefd) == -1) {
+      return Failure(ErrnoError("Failed to create pipe").message);
+    }
+
+    Subprocess::IO::InputFileDescriptors outfds;
+    outfds.read = pipefd[0];
+    outfds.write = pipefd[1];
+
+    // NOTE: We need to `cloexec` this FD so that it will be closed when
+    // the child subprocess is spawned and so that the FD will not be
+    // inherited by the second child for stderr.
+    Try<Nothing> cloexec = os::cloexec(outfds.write.get());
+    if (cloexec.isError()) {
+      os::close(outfds.read);
+      os::close(outfds.write.get());
+      return Failure("Failed to cloexec: " + cloexec.error());
+    }
+
     // Spawn a process to handle stdout.
     mesos::internal::logger::rotate::Flags outFlags;
     outFlags.max_size = flags.max_stdout_size;
@@ -98,16 +124,40 @@ public:
     Try<Subprocess> outProcess = subprocess(
         path::join(flags.launcher_dir, mesos::internal::logger::rotate::NAME),
         {mesos::internal::logger::rotate::NAME},
-        Subprocess::PIPE(),
+        Subprocess::FD(outfds.read, Subprocess::IO::OWNED),
         Subprocess::PATH("/dev/null"),
         Subprocess::FD(STDERR_FILENO),
         outFlags,
         environment);
 
     if (outProcess.isError()) {
+      os::close(outfds.write.get());
       return Failure("Failed to create logger process: " + outProcess.error());
     }
 
+    // NOTE: We manually construct a pipe here to properly express
+    // ownership of the FDs.  See the NOTE above.
+    if (::pipe(pipefd) == -1) {
+      os::close(outfds.write.get());
+      os::killtree(outProcess.get().pid(), SIGKILL);
+      return Failure(ErrnoError("Failed to create pipe").message);
+    }
+
+    Subprocess::IO::InputFileDescriptors errfds;
+    errfds.read = pipefd[0];
+    errfds.write = pipefd[1];
+
+    // NOTE: We need to `cloexec` this FD so that it will be closed when
+    // the child subprocess is spawned.
+    cloexec = os::cloexec(errfds.write.get());
+    if (cloexec.isError()) {
+      os::close(outfds.write.get());
+      os::close(errfds.read);
+      os::close(errfds.write.get());
+      os::killtree(outProcess.get().pid(), SIGKILL);
+      return Failure("Failed to cloexec: " + cloexec.error());
+    }
+
     // Spawn a process to handle stderr.
     mesos::internal::logger::rotate::Flags errFlags;
     errFlags.max_size = flags.max_stderr_size;
@@ -118,20 +168,23 @@ public:
     Try<Subprocess> errProcess = subprocess(
         path::join(flags.launcher_dir, mesos::internal::logger::rotate::NAME),
         {mesos::internal::logger::rotate::NAME},
-        Subprocess::PIPE(),
+        Subprocess::FD(errfds.read, Subprocess::IO::OWNED),
         Subprocess::PATH("/dev/null"),
         Subprocess::FD(STDERR_FILENO),
         errFlags,
         environment);
 
     if (errProcess.isError()) {
+      os::close(outfds.write.get());
+      os::close(errfds.write.get());
       os::killtree(outProcess.get().pid(), SIGKILL);
       return Failure("Failed to create logger process: " + errProcess.error());
     }
 
+    // NOTE: The ownership of these FDs is given to the caller of this function.
     ContainerLogger::SubprocessInfo info;
-    info.out = SubprocessInfo::IO::FD(outProcess->in().get());
-    info.err = SubprocessInfo::IO::FD(errProcess->in().get());
+    info.out = SubprocessInfo::IO::FD(outfds.write.get());
+    info.err = SubprocessInfo::IO::FD(errfds.write.get());
     return info;
   }
 


[2/2] mesos git commit: Add test for LogrotateContainerLogger's FD management.

Posted by be...@apache.org.
Add test for LogrotateContainerLogger's FD management.

Adds a test which checks for erroneous calls to `os::close` by the
LogrotateContainerLogger.  This may happen by accident if the
container logger module uses `Subprocess::PIPE` when launching child
processes; as libprocess will track these FDs and close them (possibly
even if they've already been closed) when the child processes exit.

Review: https://reviews.apache.org/r/42880/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/256a053a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/256a053a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/256a053a

Branch: refs/heads/master
Commit: 256a053a6a50f9e5ce44e134d96a72bb98a1b885
Parents: 3558d59
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Wed Jan 27 16:25:28 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jan 27 16:25:57 2016 -0800

----------------------------------------------------------------------
 src/tests/container_logger_tests.cpp | 99 +++++++++++++++++++++++++++++++
 1 file changed, 99 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/256a053a/src/tests/container_logger_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/container_logger_tests.cpp b/src/tests/container_logger_tests.cpp
index 5fe9cce..e161fd6 100644
--- a/src/tests/container_logger_tests.cpp
+++ b/src/tests/container_logger_tests.cpp
@@ -31,6 +31,7 @@
 #include <stout/try.hpp>
 
 #include <stout/os/exists.hpp>
+#include <stout/os/killtree.hpp>
 #include <stout/os/mkdir.hpp>
 #include <stout/os/pstree.hpp>
 #include <stout/os/stat.hpp>
@@ -484,6 +485,104 @@ TEST_F(ContainerLoggerTest, LOGROTATE_RotateInSandbox)
   }
 }
 
+
+// Tests that the logrotate container logger only closes FDs when it
+// is supposed to and does not interfere with other FDs on the agent.
+TEST_F(ContainerLoggerTest, LOGROTATE_ModuleFDOwnership)
+{
+  // Create a master, agent, and framework.
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // We'll need access to these flags later.
+  slave::Flags flags = CreateSlaveFlags();
+
+  // Use the non-default container logger that rotates logs.
+  flags.container_logger = LOGROTATE_CONTAINER_LOGGER_NAME;
+
+  Fetcher fetcher;
+
+  // We use an actual containerizer + executor since we want something to run.
+  Try<MesosContainerizer*> containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+  CHECK_SOME(containerizer);
+
+  Try<PID<Slave>> slave = StartSlave(containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+  SlaveID slaveId = slaveRegisteredMessage.get().slave_id();
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  // Wait for an offer, and start a task.
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Start a task that will keep running until the end of the test.
+  TaskInfo task = createTask(offers.get()[0], "sleep 100");
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusKilled;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusKilled))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  // Open multiple files, so that we're fairly certain we've opened
+  // the same FDs (integers) opened by the container logger.
+  vector<int> fds;
+  for (int i = 0; i < 50; i++) {
+    Try<int> fd = os::open("/dev/null", O_RDONLY);
+    ASSERT_SOME(fd);
+
+    fds.push_back(fd.get());
+  }
+
+  // Kill the task, which also kills the executor.
+  driver.killTask(statusRunning.get().task_id());
+
+  AWAIT_READY(statusKilled);
+  EXPECT_EQ(TASK_KILLED, statusKilled.get().state());
+
+  Future<Nothing> executorTerminated =
+    FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+  AWAIT_READY(executorTerminated);
+
+  // Close all the FDs we opened.  Every `close` should succeed.
+  foreach (int fd, fds) {
+    ASSERT_SOME(os::close(fd));
+  }
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {