You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2018/08/15 22:53:48 UTC

[mesos] 01/04: Call any function in a specified namespace.

This is an automated email from the ASF dual-hosted git repository.

jieyu pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a81a4f69e2514a981bbe3653ee3ee4c666dfe68e
Author: Sergey Urbanovich <se...@gmail.com>
AuthorDate: Mon Aug 13 16:20:27 2018 -0700

    Call any function in a specified namespace.
    
    The NamespaceRunner runs any function in a specified namespace. To do
    that it manages a separate thread which would be re-associated with
    that namespace.
    
    Review: https://reviews.apache.org/r/68053/
    (cherry picked from commit fd762943c928be810ba4c2c0aab9e9a6ebc399e8)
---
 src/linux/ns.hpp                     | 143 +++++++++++++++++++++++++++++++++++
 src/tests/containerizer/ns_tests.cpp |  24 ++++++
 2 files changed, 167 insertions(+)

diff --git a/src/linux/ns.hpp b/src/linux/ns.hpp
index 0b4136b..bb40038 100644
--- a/src/linux/ns.hpp
+++ b/src/linux/ns.hpp
@@ -26,11 +26,16 @@
 
 #include <sys/syscall.h>
 
+#include <queue>
 #include <set>
 #include <string>
+#include <thread>
+
+#include <process/future.hpp>
 
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
+#include <stout/option.hpp>
 #include <stout/result.hpp>
 #include <stout/try.hpp>
 
@@ -178,6 +183,144 @@ Try<pid_t> clone(
 // flags, e.g., CLONE_NEWNS | CLONE_NEWNET.
 std::string stringify(int flags);
 
+
+// The NamespaceRunner runs any function in a specified namespace.
+// To do that it manages a separate thread which would be re-associated
+// with that namespace.
+class NamespaceRunner
+{
+public:
+  NamespaceRunner()
+  {
+    // Start the looper thread.
+    thread.reset(new std::thread(&NamespaceRunner::loop, this));
+  }
+
+  ~NamespaceRunner()
+  {
+    // Shutdown the queue.
+    queue.shutdown();
+    // Wait for the thread to complete.
+    thread->join();
+    thread.reset();
+  }
+
+  // Run any function in a specified namespace.
+  template <typename T>
+  process::Future<T> run(
+      const std::string& path,
+      const std::string& ns,
+      const lambda::function<Try<T>()>& func)
+  {
+    std::shared_ptr<process::Promise<T>> promise(
+        new process::Promise<T>);
+    process::Future<T> future = promise->future();
+
+    // Put a function to the queue, the function will be called
+    // in the thread. The thread will be re-associated with the
+    // specified namespace.
+    queue.put([=]{
+      Try<Nothing> setns = ::ns::setns(path, ns, false);
+      if (setns.isError()) {
+        promise->fail(setns.error());
+      } else {
+        promise->set(func());
+      }
+    });
+
+    return future;
+  }
+
+private:
+  typedef lambda::function<void()> Func;
+
+  // The thread loop.
+  void loop()
+  {
+    for (;;) {
+      // Get a function from the queue.
+      Option<Func> func = queue.get();
+
+      // Stop the thread if the queue is shutdowned.
+      if (func.isNone()) {
+        break;
+      }
+
+      // Call the function, it re-associates the thread with the
+      // specified namespace and calls the initial user function.
+      func.get()();
+    }
+  }
+
+  // It's not safe to use process::Queue when not all of its callers are
+  // managed by libprocess. Calling Future::await() in looper thread
+  // might cause the looper thread to be donated to a libprocess Process.
+  // If that Process is very busy (e.g., master or agent Process), it's
+  // possible that the looper thread will never re-gain control.
+  //
+  // ProcessingQueue uses mutex and condition variable to solve this
+  // problem. ProcessingQueue::get() can block the thread. The main
+  // use cases for the class are thread workers and thread pools.
+  template <typename T>
+  class ProcessingQueue
+  {
+  public:
+    ProcessingQueue() : finished(false) {}
+
+    ~ProcessingQueue() = default;
+
+    // Add an element to the queue and notify one client.
+    void put(T&& t)
+    {
+      synchronized (mutex) {
+        queue.push(std::forward<T>(t));
+        cond.notify_one();
+      }
+    }
+
+    // NOTE: This function blocks the thread. It returns the oldest
+    // element from the queue and returns None() if the queue is
+    // shutdowned.
+    Option<T> get()
+    {
+      synchronized (mutex) {
+        // Wait for either a new queue element or queue shutdown.
+        while (queue.empty() && !finished) {
+          synchronized_wait(&cond, &mutex);
+        }
+
+        if (finished) {
+          // The queue is shutdowned.
+          return None();
+        }
+
+        // Return the oldest element from the queue.
+        T t = std::move(queue.front());
+        queue.pop();
+        return Some(std::move(t));
+      }
+    }
+
+    // Shutdown the queue and notify all clients.
+    void shutdown() {
+      synchronized (mutex) {
+        finished = true;
+        std::queue<T>().swap(queue);
+        cond.notify_all();
+      }
+    }
+
+  private:
+    std::mutex mutex;
+    std::condition_variable cond;
+    std::queue<T> queue;
+    bool finished;
+  };
+
+  ProcessingQueue<Func> queue;
+  std::unique_ptr<std::thread> thread;
+};
+
 } // namespace ns {
 
 #endif // __LINUX_NS_HPP__
diff --git a/src/tests/containerizer/ns_tests.cpp b/src/tests/containerizer/ns_tests.cpp
index fa4349e..14aad89 100644
--- a/src/tests/containerizer/ns_tests.cpp
+++ b/src/tests/containerizer/ns_tests.cpp
@@ -266,6 +266,30 @@ TEST(NsTest, ROOT_clone)
   EXPECT_NONE(status.get());
 }
 
+
+// Test the ns::NamespaceRunner().
+TEST(NsTest, ROOT_NamespaceRunner)
+{
+  process::Future<int> r;
+
+  // Initialize the Runner.
+  ns::NamespaceRunner runner;
+
+  // Run a dummy function in a networking namespace.
+  lambda::function<Try<int>()> f = []() -> Try<int> {
+    return 42;
+  };
+
+  r = runner.run("/proc/self/ns/net", "net", f);
+  AWAIT_READY(r);
+  EXPECT_EQ(r.get(), 42);
+
+  // Run the function with an invalid namespace type.
+  r = runner.run("/proc/self/ns/net", "mnt", f);
+  AWAIT_FAILED(r);
+  EXPECT_EQ(r.failure(), "Invalid argument");
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {