You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/30 03:05:26 UTC
[5/6] git commit: Update the slave to use the libprocess Reaper.
Update the slave to use the libprocess Reaper.
Review: https://reviews.apache.org/r/17305
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c0af3984
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c0af3984
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c0af3984
Branch: refs/heads/master
Commit: c0af39845bd010a01bf2e77b2be7d05122f1126a
Parents: 81d0181
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jan 23 14:58:36 2014 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 29 17:12:29 2014 -0800
----------------------------------------------------------------------
src/Makefile.am | 3 -
src/launcher/executor.cpp | 6 +-
src/slave/cgroups_isolator.cpp | 5 +-
src/slave/cgroups_isolator.hpp | 2 -
src/slave/process_isolator.cpp | 5 +-
src/slave/process_isolator.hpp | 2 -
src/slave/reaper.cpp | 158 -------------------------
src/slave/reaper.hpp | 92 ---------------
src/tests/environment.cpp | 12 ++
src/tests/reaper_tests.cpp | 201 --------------------------------
src/tests/slave_recovery_tests.cpp | 1 -
11 files changed, 20 insertions(+), 467 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index d58b46e..c307068 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -178,7 +178,6 @@ libmesos_no_3rdparty_la_SOURCES = \
slave/http.cpp \
slave/isolator.cpp \
slave/process_isolator.cpp \
- slave/reaper.cpp \
slave/status_update_manager.cpp \
launcher/launcher.cpp \
exec/exec.cpp \
@@ -239,7 +238,6 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp \
slave/paths.hpp slave/state.hpp \
slave/status_update_manager.hpp \
slave/process_isolator.hpp \
- slave/reaper.hpp \
slave/slave.hpp \
tests/environment.hpp tests/script.hpp \
tests/zookeeper.hpp tests/flags.hpp tests/utils.hpp \
@@ -857,7 +855,6 @@ mesos_tests_SOURCES = \
tests/monitor_tests.cpp \
tests/paths_tests.cpp \
tests/protobuf_io_tests.cpp \
- tests/reaper_tests.cpp \
tests/registrar_tests.cpp \
tests/resource_offers_tests.cpp \
tests/resources_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index b73ab47..e30d77a 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -30,6 +30,7 @@
#include <process/defer.hpp>
#include <process/future.hpp>
#include <process/process.hpp>
+#include <process/reap.hpp>
#include <stout/duration.hpp>
#include <stout/lambda.hpp>
@@ -40,8 +41,6 @@
#include "logging/logging.hpp"
-#include "slave/reaper.hpp"
-
using process::wait; // Necessary on some OS's to disambiguate.
using std::cout;
@@ -186,7 +185,7 @@ public:
std::cout << "Forked command at " << pid << std::endl;
// Monitor this process.
- reaper.monitor(pid)
+ process::reap(pid)
.onAny(defer(self(),
&Self::reaped,
driver,
@@ -294,7 +293,6 @@ private:
bool launched;
bool killed;
pid_t pid;
- slave::Reaper reaper;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 5298f20..690ae81 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -32,6 +32,7 @@
#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
+#include <process/reap.hpp>
#include <stout/bytes.hpp>
#include <stout/check.hpp>
@@ -569,7 +570,7 @@ void CgroupsIsolator::launchExecutor(
// Store the pid of the leading process of the executor.
info->pid = pid;
- reaper.monitor(pid)
+ process::reap(pid)
.onAny(defer(PID<CgroupsIsolator>(this),
&CgroupsIsolator::reaped,
pid,
@@ -901,7 +902,7 @@ Future<Nothing> CgroupsIsolator::recover(
// Add the pid to the reaper to monitor exit status.
if (run.forkedPid.isSome()) {
- reaper.monitor(run.forkedPid.get())
+ process::reap(run.forkedPid.get())
.onAny(defer(PID<CgroupsIsolator>(this),
&CgroupsIsolator::reaped,
run.forkedPid.get(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index e86062e..1a66dc6 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -41,7 +41,6 @@
#include "slave/flags.hpp"
#include "slave/isolator.hpp"
-#include "slave/reaper.hpp"
#include "slave/slave.hpp"
namespace mesos {
@@ -290,7 +289,6 @@ private:
bool local;
process::PID<Slave> slave;
bool initialized;
- Reaper reaper;
// File descriptor to 'mesos/tasks' file in the cgroup on which we place
// an advisory lock.
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index 0bc698f..748d9c2 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -29,6 +29,7 @@
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
+#include <process/reap.hpp>
#include <stout/check.hpp>
#include <stout/exit.hpp>
@@ -165,7 +166,7 @@ void ProcessIsolator::launchExecutor(
// Record the pid (should also be the pgid since we setsid below).
infos[frameworkId][executorId]->pid = pid;
- reaper.monitor(pid)
+ process::reap(pid)
.onAny(defer(PID<ProcessIsolator>(this),
&ProcessIsolator::reaped,
pid,
@@ -358,7 +359,7 @@ Future<Nothing> ProcessIsolator::recover(
// Add the pid to the reaper to monitor exit status.
if (run.forkedPid.isSome()) {
- reaper.monitor(run.forkedPid.get())
+ process::reap(run.forkedPid.get())
.onAny(defer(PID<ProcessIsolator>(this),
&ProcessIsolator::reaped,
run.forkedPid.get(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index 4ae093f..bc52f33 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -34,7 +34,6 @@
#include "slave/flags.hpp"
#include "slave/isolator.hpp"
-#include "slave/reaper.hpp"
#include "slave/slave.hpp"
namespace mesos {
@@ -106,7 +105,6 @@ private:
bool local;
process::PID<Slave> slave;
bool initialized;
- Reaper reaper;
hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
void reaped(pid_t pid, const Future<Option<int> >& status);
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
deleted file mode 100644
index 5eabbc3..0000000
--- a/src/slave/reaper.cpp
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <glog/logging.h>
-
-#include <sys/types.h>
-#include <sys/wait.h>
-
-#include <process/delay.hpp>
-#include <process/dispatch.hpp>
-#include <process/id.hpp>
-
-#include <stout/check.hpp>
-#include <stout/foreach.hpp>
-#include <stout/nothing.hpp>
-#include <stout/os.hpp>
-#include <stout/try.hpp>
-
-#include <stout/utils.hpp>
-
-#include "slave/reaper.hpp"
-
-using namespace process;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-ReaperProcess::ReaperProcess()
- : ProcessBase(ID::generate("reaper")) {}
-
-
-Future<Option<int> > ReaperProcess::monitor(pid_t pid)
-{
- // Check to see if this pid exists.
- const Result<os::Process>& process = os::process(pid);
-
- if (process.isSome()) {
- // The process exists, we add it to the promises map.
- Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
- promises.put(pid, promise);
- return promise->future();
- } else if (process.isNone()) {
- LOG(WARNING) << "Cannot monitor process " << pid
- << " because it doesn't exist";
- return None();
- } else {
- return Failure(
- "Failed to monitor process " + stringify(pid) + ": " + process.error());
- }
-}
-
-
-void ReaperProcess::initialize()
-{
- reap();
-}
-
-
-void ReaperProcess::notify(pid_t pid, Option<int> status)
-{
- foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
- promise->set(status);
- }
- promises.remove(pid);
-}
-
-
-void ReaperProcess::reap()
-{
- // This method assumes that the registered PIDs are
- // 1) children: we can reap their exit status when they are
- // terminated.
- // 2) non-children: we cannot reap their exit status.
- // 3) nonexistent: already reaped elsewhere.
-
- // Reap all child processes first.
- pid_t pid;
- int status;
- while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
- // Ignore this if the process has only stopped.
- // Notify the "listeners" only if they have requested to monitor
- // this pid. Otherwise the status is discarded.
- // This means if a child pid is registered via the monitor() call
- // after it's reaped, status 'None' will be returned.
- if (!WIFSTOPPED(status) && promises.contains(pid)) {
- notify(pid, status);
- }
- }
-
- // Check whether any monitored process has exited and been reaped.
- // 1) If a child terminates before the foreach loop but after the
- // while loop, it won't be reaped until the next reap() cycle.
- // 2) If a non-child process terminates and is reaped elsewhere,
- // e.g. by init, we notify the listeners.
- // 3) If a non-child process terminates and is not yet reaped,
- // no notification is sent.
- // 4) If a child terminates before the while loop above, then we've
- // already reaped it and have the listeners notified!
- foreach (pid_t pid, utils::copy(promises.keys())) {
- const Result<os::Process>& process = os::process(pid);
-
- if (process.isError()) {
- LOG(ERROR) << "Failed to get process information for " << pid
- <<": " << process.error();
- notify(pid, None());
- } else if (process.isNone()) {
- // The process has been reaped.
- LOG(WARNING) << "Cannot get the exit status of process " << pid
- << " because it no longer exists";
- notify(pid, None());
- }
- }
-
- delay(Seconds(1), self(), &ReaperProcess::reap); // Reap forever!
-}
-
-
-Reaper::Reaper()
-{
- process = new ReaperProcess();
- spawn(process);
-}
-
-
-Reaper::~Reaper()
-{
- terminate(process);
- wait(process);
- delete process;
-}
-
-
-Future<Option<int> > Reaper::monitor(pid_t pid)
-{
- return dispatch(process, &ReaperProcess::monitor, pid);
-}
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
deleted file mode 100644
index 9a31c75..0000000
--- a/src/slave/reaper.hpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __REAPER_HPP__
-#define __REAPER_HPP__
-
-#include <list>
-#include <set>
-
-#include <process/future.hpp>
-#include <process/owned.hpp>
-#include <process/process.hpp>
-
-#include <stout/multihashmap.hpp>
-#include <stout/nothing.hpp>
-#include <stout/try.hpp>
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declaration.
-class ReaperProcess;
-
-
-// TODO(vinod): Pull reaper into common or libprocess.
-class Reaper
-{
-public:
- Reaper();
- virtual ~Reaper();
-
- // Monitor the given process and notify the caller if it terminates
- // via a Future of the exit status.
- //
- // The exit status of 'pid' can only be correctly captured if the
- // calling process is the parent of 'pid' and the process hasn't
- // been reaped yet, otherwise 'None' is returned.
- // Note that an invalid pid does not cause a failed Future, but an
- // empty result ('None').
- process::Future<Option<int> > monitor(pid_t pid);
-
-private:
- ReaperProcess* process;
-};
-
-
-// Reaper implementation.
-class ReaperProcess : public process::Process<ReaperProcess>
-{
-public:
- ReaperProcess();
-
- process::Future<Option<int> > monitor(pid_t pid);
-
-protected:
- virtual void initialize();
-
- void reap();
-
- // The notification is sent only if the pid is explicitly registered
- // via the monitor() call.
- void notify(pid_t pid, Option<int> status);
-
-private:
- // Mapping from the monitored pid to all promises the pid exit
- // status should be sent to.
- multihashmap<
- pid_t, process::Owned<process::Promise<Option<int> > > > promises;
-};
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __REAPER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/tests/environment.cpp
----------------------------------------------------------------------
diff --git a/src/tests/environment.cpp b/src/tests/environment.cpp
index 6edce45..41b8a71 100644
--- a/src/tests/environment.cpp
+++ b/src/tests/environment.cpp
@@ -18,6 +18,10 @@
#include <gtest/gtest.h>
+#include <sys/wait.h>
+
+#include <string.h>
+
#include <list>
#include <string>
@@ -235,6 +239,14 @@ void Environment::TearDown()
}
}
directories.clear();
+
+ // Make sure we haven't left any child processes lying around.
+ Try<os::ProcessTree> pstree = os::pstree(0);
+
+ if (pstree.isSome() && !pstree.get().children.empty()) {
+ FAIL() << "Tests completed with child processes remaining:\n"
+ << pstree.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
deleted file mode 100644
index 608ec0e..0000000
--- a/src/tests/reaper_tests.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <signal.h>
-#include <unistd.h>
-
-#include <sys/wait.h>
-
-#include <gtest/gtest.h>
-
-#include <process/clock.hpp>
-#include <process/dispatch.hpp>
-#include <process/gmock.hpp>
-#include <process/gtest.hpp>
-
-#include <stout/exit.hpp>
-#include <stout/gtest.hpp>
-#include <stout/os.hpp>
-
-#include "slave/reaper.hpp"
-
-using namespace os;
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-using process::Clock;
-using process::Future;
-
-using testing::_;
-using testing::DoDefault;
-
-
-// This test checks that the Reaper can monitor a non-child process.
-TEST(ReaperTest, NonChildProcess)
-{
- // The child process creates a grandchild and then exits. The
- // grandchild sleeps for 10 seconds. The process tree looks like:
- // -+- child exit 0
- // \-+- grandchild sleep 10
-
- // After the child exits, the grandchild is going to be re-parented
- // by 'init', like this:
- // -+- child (exit 0)
- // -+- grandchild sleep 10
- Try<ProcessTree> tree = Fork(None(),
- Fork(Exec("sleep 10")),
- Exec("exit 0"))();
- ASSERT_SOME(tree);
- ASSERT_EQ(1u, tree.get().children.size());
- pid_t grandchild = tree.get().children.front();
-
- Reaper reaper;
-
- Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
-
- // Ask the reaper to monitor the grand child process.
- Future<Option<int> > status = reaper.monitor(grandchild);
-
- AWAIT_READY(monitor);
-
- // This makes sure the status only becomes ready after the
- // grandchild is killed.
- EXPECT_TRUE(status.isPending());
-
- // Now kill the grand child.
- // NOTE: We send a SIGKILL here because sometimes the grand child
- // process seems to be in a hung state and not responding to
- // SIGTERM/SIGINT.
- EXPECT_EQ(0, kill(grandchild, SIGKILL));
-
- Clock::pause();
-
- // Now advance time until the reaper reaps the executor.
- while (status.isPending()) {
- Clock::advance(Seconds(1));
- Clock::settle();
- }
-
- // Ensure the reaper notifies of the terminated process.
- AWAIT_READY(status);
-
- // Status is None because pid is not an immediate child.
- ASSERT_NONE(status.get());
-
- Clock::resume();
-}
-
-
-// This test checks that the Reaper can monitor a child process with
-// accurate exit status returned.
-TEST(ReaperTest, ChildProcess)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
- // The child process sleeps and will be killed by the parent.
- Try<ProcessTree> tree = Fork(None(),
- Exec("sleep 10"))();
-
- ASSERT_SOME(tree);
- pid_t child = tree.get();
-
- Reaper reaper;
-
- Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
-
- // Ask the reaper to monitor the grand child process.
- Future<Option<int> > status = reaper.monitor(child);
-
- AWAIT_READY(monitor);
-
- // Now kill the child.
- EXPECT_EQ(0, kill(child, SIGKILL));
-
- Clock::pause();
-
- // Now advance time until the reaper reaps the executor.
- while (status.isPending()) {
- Clock::advance(Seconds(1));
- Clock::settle();
- }
-
- // Ensure the reaper notifies of the terminated process.
- AWAIT_READY(status);
-
- // Check if the status is correct.
- ASSERT_SOME(status.get());
- int status_ = status.get().get();
- ASSERT_TRUE(WIFSIGNALED(status_));
- ASSERT_EQ(SIGKILL, WTERMSIG(status_));
-
- Clock::resume();
-}
-
-
-// Check that the Reaper can monitor a child process that exits
-// before monitor() is called on it.
-TEST(ReaperTest, TerminatedChildProcess)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
- // The child process immediately exits.
- Try<ProcessTree> tree = Fork(None(),
- Exec("exit 0"))();
-
- ASSERT_SOME(tree);
- pid_t child = tree.get();
-
- ASSERT_SOME(os::process(child));
-
- Clock::pause();
-
- Reaper reaper;
-
- // Because reaper reaps all child processes even if they aren't
- // registered, we advance time until that happens.
- while (os::process(child).isSome()) {
- Clock::advance(Seconds(1));
- Clock::settle();
- }
-
- // Now we request to monitor the child process which is already
- // reaped.
-
- Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
-
- // Ask the reaper to monitor the child process.
- Future<Option<int> > status = reaper.monitor(child);
-
- AWAIT_READY(monitor);
-
- // Now advance time until the reaper sends the notification.
- while (status.isPending()) {
- Clock::advance(Seconds(1));
- Clock::settle();
- }
-
- // Ensure the reaper notifies of the terminated process.
- AWAIT_READY(status);
-
- // Invalid status is returned because it is reaped before being
- // monitored.
- ASSERT_NONE(status.get());
-
- Clock::resume();
-}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c0af3984/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 999e598..b8c5123 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -52,7 +52,6 @@
#endif
#include "slave/paths.hpp"
#include "slave/process_isolator.hpp"
-#include "slave/reaper.hpp"
#include "slave/slave.hpp"
#include "slave/state.hpp"