You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2018/08/15 22:53:48 UTC
[mesos] 01/04: Call any function in a specified namespace.
This is an automated email from the ASF dual-hosted git repository.
jieyu pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a81a4f69e2514a981bbe3653ee3ee4c666dfe68e
Author: Sergey Urbanovich <se...@gmail.com>
AuthorDate: Mon Aug 13 16:20:27 2018 -0700
Call any function in a specified namespace.
The NamespaceRunner runs any function in a specified namespace. To do
that it manages a separate thread which would be re-associated with
that namespace.
Review: https://reviews.apache.org/r/68053/
(cherry picked from commit fd762943c928be810ba4c2c0aab9e9a6ebc399e8)
---
src/linux/ns.hpp | 143 +++++++++++++++++++++++++++++++++++
src/tests/containerizer/ns_tests.cpp | 24 ++++++
2 files changed, 167 insertions(+)
diff --git a/src/linux/ns.hpp b/src/linux/ns.hpp
index 0b4136b..bb40038 100644
--- a/src/linux/ns.hpp
+++ b/src/linux/ns.hpp
@@ -26,11 +26,16 @@
#include <sys/syscall.h>
+#include <queue>
#include <set>
#include <string>
+#include <thread>
+
+#include <process/future.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
+#include <stout/option.hpp>
#include <stout/result.hpp>
#include <stout/try.hpp>
@@ -178,6 +183,144 @@ Try<pid_t> clone(
// flags, e.g., CLONE_NEWNS | CLONE_NEWNET.
std::string stringify(int flags);
+
+// The NamespaceRunner runs any function in a specified namespace.
+// To do that it manages a separate thread which would be re-associated
+// with that namespace.
+class NamespaceRunner
+{
+public:
+ NamespaceRunner()
+ {
+ // Start the looper thread.
+ thread.reset(new std::thread(&NamespaceRunner::loop, this));
+ }
+
+ ~NamespaceRunner()
+ {
+ // Shutdown the queue.
+ queue.shutdown();
+ // Wait for the thread to complete.
+ thread->join();
+ thread.reset();
+ }
+
+ // Run any function in a specified namespace.
+ template <typename T>
+ process::Future<T> run(
+ const std::string& path,
+ const std::string& ns,
+ const lambda::function<Try<T>()>& func)
+ {
+ std::shared_ptr<process::Promise<T>> promise(
+ new process::Promise<T>);
+ process::Future<T> future = promise->future();
+
+ // Put a function to the queue, the function will be called
+ // in the thread. The thread will be re-associated with the
+ // specified namespace.
+ queue.put([=]{
+ Try<Nothing> setns = ::ns::setns(path, ns, false);
+ if (setns.isError()) {
+ promise->fail(setns.error());
+ } else {
+ promise->set(func());
+ }
+ });
+
+ return future;
+ }
+
+private:
+ typedef lambda::function<void()> Func;
+
+ // The thread loop.
+ void loop()
+ {
+ for (;;) {
+ // Get a function from the queue.
+ Option<Func> func = queue.get();
+
+ // Stop the thread if the queue is shutdowned.
+ if (func.isNone()) {
+ break;
+ }
+
+ // Call the function, it re-associates the thread with the
+ // specified namespace and calls the initial user function.
+ func.get()();
+ }
+ }
+
+ // It's not safe to use process::Queue when not all of its callers are
+ // managed by libprocess. Calling Future::await() in looper thread
+ // might cause the looper thread to be donated to a libprocess Process.
+ // If that Process is very busy (e.g., master or agent Process), it's
+ // possible that the looper thread will never re-gain control.
+ //
+ // ProcessingQueue uses mutex and condition variable to solve this
+ // problem. ProcessingQueue::get() can block the thread. The main
+ // use cases for the class are thread workers and thread pools.
+ template <typename T>
+ class ProcessingQueue
+ {
+ public:
+ ProcessingQueue() : finished(false) {}
+
+ ~ProcessingQueue() = default;
+
+ // Add an element to the queue and notify one client.
+ void put(T&& t)
+ {
+ synchronized (mutex) {
+ queue.push(std::forward<T>(t));
+ cond.notify_one();
+ }
+ }
+
+ // NOTE: This function blocks the thread. It returns the oldest
+ // element from the queue and returns None() if the queue is
+ // shutdowned.
+ Option<T> get()
+ {
+ synchronized (mutex) {
+ // Wait for either a new queue element or queue shutdown.
+ while (queue.empty() && !finished) {
+ synchronized_wait(&cond, &mutex);
+ }
+
+ if (finished) {
+ // The queue is shutdowned.
+ return None();
+ }
+
+ // Return the oldest element from the queue.
+ T t = std::move(queue.front());
+ queue.pop();
+ return Some(std::move(t));
+ }
+ }
+
+ // Shutdown the queue and notify all clients.
+ void shutdown() {
+ synchronized (mutex) {
+ finished = true;
+ std::queue<T>().swap(queue);
+ cond.notify_all();
+ }
+ }
+
+ private:
+ std::mutex mutex;
+ std::condition_variable cond;
+ std::queue<T> queue;
+ bool finished;
+ };
+
+ ProcessingQueue<Func> queue;
+ std::unique_ptr<std::thread> thread;
+};
+
} // namespace ns {
#endif // __LINUX_NS_HPP__
diff --git a/src/tests/containerizer/ns_tests.cpp b/src/tests/containerizer/ns_tests.cpp
index fa4349e..14aad89 100644
--- a/src/tests/containerizer/ns_tests.cpp
+++ b/src/tests/containerizer/ns_tests.cpp
@@ -266,6 +266,30 @@ TEST(NsTest, ROOT_clone)
EXPECT_NONE(status.get());
}
+
+// Test the ns::NamespaceRunner().
+TEST(NsTest, ROOT_NamespaceRunner)
+{
+ process::Future<int> r;
+
+ // Initialize the Runner.
+ ns::NamespaceRunner runner;
+
+ // Run a dummy function in a networking namespace.
+ lambda::function<Try<int>()> f = []() -> Try<int> {
+ return 42;
+ };
+
+ r = runner.run("/proc/self/ns/net", "net", f);
+ AWAIT_READY(r);
+ EXPECT_EQ(r.get(), 42);
+
+ // Run the function with an invalid namespace type.
+ r = runner.run("/proc/self/ns/net", "mnt", f);
+ AWAIT_FAILED(r);
+ EXPECT_EQ(r.failure(), "Invalid argument");
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {