You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/10/24 23:19:05 UTC

[1/8] mesos git commit: Libprocess Reinit: Added SocketManager and ProcessManager cleanup.

Repository: mesos
Updated Branches:
  refs/heads/master 99f290bcb -> 16f479d15


Libprocess Reinit: Added SocketManager and ProcessManager cleanup.

The `SocketManager` and `ProcessManager` are highly inter-dependent,
which requires some untangling in `process::finalize`.

* Logic originally found in `~ProcessManager` has been moved into
  `ProcessManager::finalize` due to what happens during cleanup.
* The future from `__s__->accept()` must be explicitly discarded as
  libevent does not detect a locally closed socket.
* Terminating `HttpProxy`s must close the associated socket.

Review: https://reviews.apache.org/r/40266/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eaee376e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eaee376e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eaee376e

Branch: refs/heads/master
Commit: eaee376ea69e84b469c50a65f7ef5eb0e3aad82f
Parents: 99f290b
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:30 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:17:23 2016 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 218 ++++++++++++++++++++++++++-----
 1 file changed, 186 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/eaee376e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 18a8e20..35ed637 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -301,6 +301,11 @@ public:
   SocketManager();
   ~SocketManager();
 
+  // Closes all managed sockets and clears any associated metadata.
+  // The `__s__` server socket must be closed and `ProcessManager`
+  // must be finalized before calling this.
+  void finalize();
+
   void accepted(const Socket& socket);
 
   void link(ProcessBase* process,
@@ -314,6 +319,11 @@ public:
 
   PID<HttpProxy> proxy(const Socket& socket);
 
+  // Used to clean up the pointer to an `HttpProxy` in case the
+  // `HttpProxy` is killed outside the control of the `SocketManager`.
+  // This generally happens when `process::finalize` is called.
+  void unproxy(const Socket& socket);
+
   void send(Encoder* encoder, bool persist);
   void send(const Response& response,
             const Request& request,
@@ -400,6 +410,13 @@ public:
   explicit ProcessManager(const Option<string>& delegate);
   ~ProcessManager();
 
+  // Prevents any further processes from spawning and terminates all
+  // running processes. The special `gc` process will be terminated
+  // last. Then joins all processing threads and stops the event loop.
+  //
+  // This is a prerequisite for finalizing the `SocketManager`.
+  void finalize();
+
   // Initializes the processing threads and the event loop thread,
   // and returns the number of processing threads created.
   long init_threads();
@@ -420,7 +437,10 @@ public:
       Event* event,
       ProcessBase* sender = nullptr);
 
+  // TODO(josephw): Change the return type to a `Try<UPID>`. Currently,
+  // if this method fails, we return a default constructed `UPID`.
   UPID spawn(ProcessBase* process, bool manage);
+
   void resume(ProcessBase* process);
   void cleanup(ProcessBase* process);
 
@@ -470,6 +490,10 @@ private:
   // List of rules applied to all incoming HTTP requests.
   vector<Owned<firewall::FirewallRule>> firewallRules;
   std::recursive_mutex firewall_mutex;
+
+  // Whether the process manager is finalizing or not.
+  // If true, no further processes will be spawned.
+  std::atomic_bool finalizing;
 };
 
 
@@ -479,6 +503,15 @@ static const int LISTEN_BACKLOG = 500000;
 // Local server socket.
 static Socket* __s__ = nullptr;
 
+// This mutex is only used to prevent a race between the `on_accept`
+// callback loop and closing/deleting `__s__` in `process::finalize`.
+static std::mutex* socket_mutex = new std::mutex();
+
+// The future returned by the last call to `__s__->accept()`.
+// This is used in `process::finalize` to explicitly terminate the
+// `__s__` socket's callback loop.
+static Future<Socket> future_accept;
+
 // Local socket address.
 static Address __address__;
 
@@ -795,8 +828,13 @@ void on_accept(const Future<Socket>& socket)
           decoder));
   }
 
-  __s__->accept()
-    .onAny(lambda::bind(&on_accept, lambda::_1));
+  // NOTE: `__s__` may be cleaned up during `process::finalize`.
+  synchronized (socket_mutex) {
+    if (__s__ != nullptr) {
+      future_accept = __s__->accept()
+        .onAny(lambda::bind(&on_accept, lambda::_1));
+    }
+  }
 }
 
 } // namespace internal {
@@ -1002,7 +1040,7 @@ bool initialize(
   // invoke `accept()` and `spawn()` below.
   initialize_complete.store(true);
 
-  __s__->accept()
+  future_accept = __s__->accept()
     .onAny(lambda::bind(&internal::on_accept, lambda::_1));
 
   // TODO(benh): Make sure creating the garbage collector, logging
@@ -1079,30 +1117,65 @@ bool initialize(
 // initialization.
 void finalize()
 {
-  // The clock is only paused during tests.  Pausing may lead to infinite waits
-  // during clean up, so we make sure the clock is running normally.
+  // The clock is only paused during tests.  Pausing may lead to infinite
+  // waits during clean up, so we make sure the clock is running normally.
   Clock::resume();
 
-  // This will terminate any existing processes created via `spawn()`,
-  // like `gc`, `help`, `Logging()`, `Profiler()`, and `System()`.
-  // NOTE: This will also stop the event loop.
-
-  // TODO(arojas): The HTTP authentication logic in ProcessManager
-  // does not handle the case where the process_manager is deleted
-  // while authentication was in progress!!
-
+  // This will terminate the underlying process for the `Route`.
   delete processes_route;
   processes_route = nullptr;
 
+  // Terminate all running processes and prevent further processes from
+  // being spawned. This will also clean up any metadata for running
+  // processes held by the `SocketManager`. After this method returns,
+  // libprocess should be single-threaded.
+  process_manager->finalize();
+
+  // This clears any remaining timers. Because the event loop has been
+  // stopped, no timers will fire.
+  Clock::finalize();
+
+  // Close the server socket.
+  // This will prevent any further connections managed by the `SocketManager`.
+  synchronized (socket_mutex) {
+    // Explicitly terminate the callback loop used to accept incoming
+    // connections. This is necessary as the server socket ignores
+    // most errors, including when the server socket has been closed.
+    future_accept.discard();
+
+    delete __s__;
+    __s__ = nullptr;
+  }
+
+  // Clean up the socket manager.
+  // Terminating processes above will also clean up any links between
+  // processes (which may be expressed as open sockets) and the various
+  // `HttpProxy` processes managing incoming HTTP requests. We cannot
+  // delete the `SocketManager` yet, since the `ProcessManager` may
+  // potentially dereference it.
+  socket_manager->finalize();
+
+  // This is dereferenced inside `ProcessBase::visit(HttpEvent&)`.
+  // We can safely delete it since no further incoming HTTP connections
+  // can be made because the server socket has been destroyed. This must
+  // be deleted before the `ProcessManager` as it will indirectly
+  // dereference the `ProcessManager`.
+  delete authenticator_manager;
+  authenticator_manager = nullptr;
+
+  // At this point, there should be no running processes, no sockets,
+  // and a single remaining thread. We can safely remove the global
+  // `SocketManager` and `ProcessManager` pointers now.
+  delete socket_manager;
+  socket_manager = nullptr;
+
   delete process_manager;
   process_manager = nullptr;
 
-  // TODO(neilc): We currently don't cleanup or deallocate the
-  // socket_manager (MESOS-3910).
-
-  // The clock must be cleaned up after the `process_manager` as processes
-  // may otherwise add timers after cleaning up.
-  Clock::finalize();
+  // Clear the public address of the server socket.
+  // NOTE: This variable is necessary for process communication, so it
+  // cannot be cleared until after the `ProcessManager` is deleted.
+  __address__ = Address::LOCALHOST_ANY();
 }
 
 
@@ -1165,6 +1238,10 @@ HttpProxy::~HttpProxy()
     items.pop();
     delete item;
   }
+
+  // Just in case this process gets killed outside of `SocketManager::close`,
+  // remove the proxy from the socket.
+  socket_manager->unproxy(socket);
 }
 
 
@@ -1395,6 +1472,36 @@ SocketManager::SocketManager() {}
 SocketManager::~SocketManager() {}
 
 
+void SocketManager::finalize()
+{
+  // We require the `SocketManager` to be finalized after the server socket
+  // has been closed. This means that no further incoming sockets will be
+  // given to the `SocketManager` at this point.
+  CHECK(__s__ == nullptr);
+
+  // We require all processes to be terminated prior to finalizing the
+  // `SocketManager`. This simplifies the finalization logic as we do not
+  // have to worry about sockets or links being created during cleanup.
+  CHECK(gc == nullptr);
+
+  int socket = -1;
+  // Close each socket.
+  // Don't hold the lock since there is a dependency between `SocketManager`
+  // and `ProcessManager`, which may result in deadlock.  See comments in
+  // `SocketManager::close` for more details.
+  do {
+    synchronized (mutex) {
+      socket = !sockets.empty() ? sockets.begin()->first : -1;
+    }
+
+    if (socket >= 0) {
+      // This will also clean up any other state related to this socket.
+      close(socket);
+    }
+  } while (socket >= 0);
+}
+
+
 void SocketManager::accepted(const Socket& socket)
 {
   synchronized (mutex) {
@@ -1715,6 +1822,20 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket)
 }
 
 
+void SocketManager::unproxy(const Socket& socket)
+{
+  synchronized (mutex) {
+    auto proxy = proxies.find(socket);
+
+    // NOTE: We may have already removed this proxy if the associated
+    // `HttpProxy` was destructed via `SocketManager::close`.
+    if (proxy != proxies.end()) {
+      proxies.erase(proxy);
+    }
+  }
+}
+
+
 namespace internal {
 
 void _send(
@@ -2336,49 +2457,68 @@ void SocketManager::swap_implementing_socket(
 ProcessManager::ProcessManager(const Option<string>& _delegate)
   : delegate(_delegate),
     running(0),
-    joining_threads(false) {}
+    joining_threads(false),
+    finalizing(false) {}
+
 
+ProcessManager::~ProcessManager() {}
 
-ProcessManager::~ProcessManager()
+
+void ProcessManager::finalize()
 {
   CHECK(gc != nullptr);
 
+  // Prevent anymore processes from being spawned.
+  finalizing.store(true);
+
   // Terminate one process at a time. Events are deleted and the process
   // is erased from `processes` in ProcessManager::cleanup(). Don't hold
   // the lock or process the whole map as terminating one process might
   // trigger other terminations.
   //
-  // We skip the GC process here and instead terminate it below. This
-  // ensures that the GC process is running whenever we terminate any
-  // GC-managed process, which is necessary to ensure GC is performed.
+  // We skip the GC process in this loop and instead terminate it last.
+  // This ensures that the GC process is running whenever we terminate
+  // any GC-managed process, which is necessary to prevent leaking.
   while (true) {
-    ProcessBase* process = nullptr;
+    // NOTE: We terminate by `UPID` rather than `ProcessBase` as the
+    // process may terminate between the synchronized section below
+    // and the calls to `process:terminate` and `process::wait`.
+    // If the process has already terminated, further termination
+    // is a noop.
+    UPID pid;
 
     synchronized (processes_mutex) {
+      ProcessBase* process = nullptr;
+
       foreachvalue (ProcessBase* candidate, processes) {
         if (candidate == gc) {
           continue;
         }
+
         process = candidate;
+        pid = candidate->self();
         break;
       }
-    }
 
-    if (process == nullptr) {
-      break;
+      if (process == nullptr) {
+        break;
+      }
     }
 
     // Terminate this process but do not inject the message,
     // i.e. allow it to finish its work first.
-    process::terminate(process, false);
-    process::wait(process);
+    process::terminate(pid, false);
+    process::wait(pid);
   }
 
+  // Terminate `gc`.
   process::terminate(gc, false);
   process::wait(gc);
 
-  delete gc;
-  gc = nullptr;
+  synchronized (processes_mutex) {
+    delete gc;
+    gc = nullptr;
+  }
 
   // Send signal to all processing threads to stop running.
   joining_threads.store(true);
@@ -2710,6 +2850,20 @@ UPID ProcessManager::spawn(ProcessBase* process, bool manage)
 {
   CHECK(process != nullptr);
 
+  // If the `ProcessManager` is cleaning itself up, no further processes
+  // may be spawned.
+  if (finalizing.load()) {
+    LOG(WARNING)
+      << "Attempted to spawn a process (" << process->self()
+      << ") after finalizing libprocess!";
+
+    if (manage) {
+      delete process;
+    }
+
+    return UPID();
+  }
+
   synchronized (processes_mutex) {
     if (processes.count(process->pid.id) > 0) {
       return UPID();


[3/8] mesos git commit: Libprocess Reinit: Moved HttpProxy finalization and destruction.

Posted by jo...@apache.org.
Libprocess Reinit: Moved HttpProxy finalization and destruction.

Moves the destructor code in `HttpProxy` into the `Process::finalize`
function.  And changes the `HttpProxy`s termination logic to
terminate via `UPID` which guards against double-termination.

Removes some unused initialization code, too.

Review: https://reviews.apache.org/r/50621/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0a64d7a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0a64d7a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0a64d7a4

Branch: refs/heads/master
Commit: 0a64d7a44296bc030581f3a0f8ca16fe0a5c06bb
Parents: b851894
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:34 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:18:46 2016 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 33 +++++++-------------------------
 1 file changed, 7 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0a64d7a4/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 15e80e6..2be8e84 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -188,7 +188,7 @@ class HttpProxy : public Process<HttpProxy>
 {
 public:
   explicit HttpProxy(const Socket& _socket);
-  virtual ~HttpProxy();
+  virtual ~HttpProxy() {};
 
   // Enqueues the response to be sent once all previously enqueued
   // responses have been processed (e.g., waited for and sent).
@@ -200,7 +200,7 @@ public:
   void handle(const Future<Response>& future, const Request& request);
 
 protected:
-  void initialize() override;
+  void finalize() override;
 
 private:
   // Starts "waiting" on the next available future response.
@@ -233,15 +233,6 @@ private:
   queue<Item*> items;
 
   Option<http::Pipe::Reader> pipe; // Current pipe, if streaming.
-
-  // We sequence the authentication results exposed to the caller
-  // in order to satisfy HTTP pipelining.
-  //
-  // Note that this needs to be done explicitly here because
-  // the authentication router does expose ordered completion
-  // of its Futures (it doesn't have the knowledge of sockets
-  // necessary to do it in a per-connection manner).
-  Owned<Sequence> authentications;
 };
 
 
@@ -1214,7 +1205,7 @@ HttpProxy::HttpProxy(const Socket& _socket)
     socket(_socket) {}
 
 
-HttpProxy::~HttpProxy()
+void HttpProxy::finalize()
 {
   // Need to make sure response producers know not to continue to
   // create a response (streaming or otherwise).
@@ -1253,16 +1244,6 @@ HttpProxy::~HttpProxy()
 }
 
 
-void HttpProxy::initialize()
-{
-  // We have to construct the sequence outside of the HttpProxy
-  // constructor in order to prevent a deadlock between the
-  // SocketManager and the ProcessManager (see the comment in
-  // SocketManager::proxy).
-  authentications.reset(new Sequence("__authentications__"));
-}
-
-
 void HttpProxy::enqueue(const Response& response, const Request& request)
 {
   handle(Future<Response>(response), request);
@@ -2218,7 +2199,7 @@ Encoder* SocketManager::next(int s)
 
 void SocketManager::close(int s)
 {
-  HttpProxy* proxy = nullptr; // Non-null if needs to be terminated.
+  Option<UPID> proxy; // Some if an `HttpProxy` needs to be terminated.
 
   synchronized (mutex) {
     // This socket might not be active if it was already asked to get
@@ -2255,7 +2236,7 @@ void SocketManager::close(int s)
 
       // Clean up any proxy associated with this socket.
       if (proxies.count(s) > 0) {
-        proxy = proxies[s];
+        proxy = proxies.at(s)->self();
         proxies.erase(s);
       }
 
@@ -2288,8 +2269,8 @@ void SocketManager::close(int s)
 
   // We terminate the proxy outside the synchronized block to avoid
   // possible deadlock between the ProcessManager and SocketManager.
-  if (proxy != nullptr) {
-    terminate(proxy);
+  if (proxy.isSome()) {
+    terminate(proxy.get());
   }
 
   // Note that we don't actually:


[2/8] mesos git commit: Libprocess Reinit: Removed authorizer callback leaks.

Posted by jo...@apache.org.
Libprocess Reinit: Removed authorizer callback leaks.

The `authorization_callbacks` were never being freed after being set
or unset.  If you run the tests in repetition, this leaks memory.

Review: https://reviews.apache.org/r/51049/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b8518940
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b8518940
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b8518940

Branch: refs/heads/master
Commit: b851894079565aff88e23dec087bd90a0a832c6b
Parents: eaee376
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:32 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:18:10 2016 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b8518940/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 35ed637..15e80e6 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -582,12 +582,20 @@ namespace authorization {
 
 void setCallbacks(const AuthorizationCallbacks& callbacks)
 {
+  if (authorization_callbacks != nullptr) {
+    delete authorization_callbacks;
+  }
+
   authorization_callbacks = new AuthorizationCallbacks(callbacks);
 }
 
 
 void unsetCallbacks()
 {
+  if (authorization_callbacks != nullptr) {
+    delete authorization_callbacks;
+  }
+
   authorization_callbacks = nullptr;
 }
 


[6/8] mesos git commit: Libprocess Reinit: Moved ReaperProcess instantiation into process.cpp.

Posted by jo...@apache.org.
Libprocess Reinit: Moved ReaperProcess instantiation into process.cpp.

The reaper singleton must be unified with `process::initialize` so
that it also falls under the scope of reinitialization.  The singleton
must also not be guarded by `Once`.

Review: https://reviews.apache.org/r/40413/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3e3aeb24
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3e3aeb24
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3e3aeb24

Branch: refs/heads/master
Commit: 3e3aeb240fcda3001e62f628fe78ba66f9a8f948
Parents: 757727c
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:39 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:18:51 2016 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/reap.hpp |  33 +++++
 3rdparty/libprocess/src/process.cpp          |  11 ++
 3rdparty/libprocess/src/reap.cpp             | 156 +++++++++++-----------
 3 files changed, 120 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3aeb24/3rdparty/libprocess/include/process/reap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/reap.hpp b/3rdparty/libprocess/include/process/reap.hpp
index d7e0fa3..211fc70 100644
--- a/3rdparty/libprocess/include/process/reap.hpp
+++ b/3rdparty/libprocess/include/process/reap.hpp
@@ -16,14 +16,47 @@
 #include <sys/types.h>
 
 #include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
 
+#include <stout/multihashmap.hpp>
 #include <stout/option.hpp>
+#include <stout/result.hpp>
 
 namespace process {
 
 // The upper bound for the poll interval in the reaper.
 Duration MAX_REAP_INTERVAL();
 
+namespace internal {
+
+class ReaperProcess : public Process<ReaperProcess>
+{
+public:
+  ReaperProcess();
+
+  Future<Option<int>> reap(pid_t pid);
+
+protected:
+  virtual void initialize();
+
+  void wait();
+
+  void notify(pid_t pid, Result<int> status);
+
+private:
+  const Duration interval();
+
+  multihashmap<pid_t, Owned<Promise<Option<int>>>> promises;
+};
+
+
+// Global reaper process. Defined in process.cpp.
+extern PID<ReaperProcess> reaper;
+
+} // namespace internal {
+
+
 // Returns the exit status of the specified process if and only if
 // the process is a direct child and it has not already been reaped.
 // Otherwise, returns None once the process has been reaped elsewhere

http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3aeb24/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 8af056e..b6ef0f2 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -78,6 +78,7 @@
 #include <process/owned.hpp>
 #include <process/process.hpp>
 #include <process/profiler.hpp>
+#include <process/reap.hpp>
 #include <process/sequence.hpp>
 #include <process/socket.hpp>
 #include <process/statistics.hpp>
@@ -553,6 +554,12 @@ PID<metrics::internal::MetricsProcess> metrics;
 } // namespace internal {
 } // namespace metrics {
 
+namespace internal {
+
+PID<process::internal::ReaperProcess> reaper;
+
+} // namespace internal {
+
 
 namespace http {
 
@@ -1104,6 +1111,10 @@ bool initialize(
   // Create the global HTTP authentication router.
   authenticator_manager = new AuthenticatorManager();
 
+  // Create the global reaper process.
+  process::internal::reaper =
+    spawn(new process::internal::ReaperProcess(), true);
+
   // Initialize the mime types.
   mime::initialize();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3aeb24/3rdparty/libprocess/src/reap.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/reap.cpp b/3rdparty/libprocess/src/reap.cpp
index 5fc2a4d..380edbb 100644
--- a/3rdparty/libprocess/src/reap.cpp
+++ b/3rdparty/libprocess/src/reap.cpp
@@ -28,6 +28,7 @@
 #include <stout/foreach.hpp>
 #include <stout/multihashmap.hpp>
 #include <stout/none.hpp>
+#include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/result.hpp>
 #include <stout/try.hpp>
@@ -60,108 +61,103 @@ Duration MIN_REAP_INTERVAL() { return Milliseconds(100); }
 const size_t HIGH_PID_COUNT = 500;
 Duration MAX_REAP_INTERVAL() { return Seconds(1); }
 
+namespace internal {
 
-class ReaperProcess : public Process<ReaperProcess>
+ReaperProcess::ReaperProcess() : ProcessBase(ID::generate("__reaper__")) {}
+
+
+Future<Option<int>> ReaperProcess::reap(pid_t pid)
 {
-public:
-  ReaperProcess() : ProcessBase(ID::generate("__reaper__")) {}
-
-  Future<Option<int>> reap(pid_t pid)
-  {
-    // Check to see if this pid exists.
-    if (os::exists(pid)) {
-      Owned<Promise<Option<int>>> promise(new Promise<Option<int>>());
-      promises.put(pid, promise);
-      return promise->future();
-    } else {
-      return None();
-    }
+  // Check to see if this pid exists.
+  if (os::exists(pid)) {
+    Owned<Promise<Option<int>>> promise(new Promise<Option<int>>());
+    promises.put(pid, promise);
+    return promise->future();
+  } else {
+    return None();
   }
+}
 
-protected:
-  virtual void initialize() { wait(); }
-
-  void wait()
-  {
-    // There are two cases to consider for each pid when it terminates:
-    //   1) The process is our child. In this case, we will reap the process and
-    //      notify with the exit status.
-    //   2) The process was not our child. In this case, it will be reaped by
-    //      someone else (its parent or init, if reparented) so we cannot know
-    //      the exit status and we must notify with None().
-    //
-    // NOTE: A child can only be reaped by us, the parent. If a child exits
-    // between waitpid and the (!exists) conditional it will still exist as a
-    // zombie; it will be reaped by us on the next loop.
-    foreach (pid_t pid, promises.keys()) {
-      int status;
-      Result<pid_t> child_pid = os::waitpid(pid, &status, WNOHANG);
-      if (child_pid.isSome()) {
-        // We have reaped a child.
-        notify(pid, status);
-      } else if (!os::exists(pid)) {
-        // The process no longer exists and has been reaped by someone else.
-        notify(pid, None());
-      }
-    }
 
-    delay(interval(), self(), &ReaperProcess::wait); // Reap forever!
-  }
+void ReaperProcess::initialize()
+{
+  wait();
+}
 
-  void notify(pid_t pid, Result<int> status)
-  {
-    foreach (const Owned<Promise<Option<int>>>& promise, promises.get(pid)) {
-      if (status.isError()) {
-        promise->fail(status.error());
-      } else if (status.isNone()) {
-        promise->set(Option<int>::none());
-      } else {
-        promise->set(Option<int>(status.get()));
-      }
+
+void ReaperProcess::wait()
+{
+  // There are two cases to consider for each pid when it terminates:
+  //   1) The process is our child. In this case, we will reap the process and
+  //      notify with the exit status.
+  //   2) The process was not our child. In this case, it will be reaped by
+  //      someone else (its parent or init, if reparented) so we cannot know
+  //      the exit status and we must notify with None().
+  //
+  // NOTE: A child can only be reaped by us, the parent. If a child exits
+  // between waitpid and the (!exists) conditional it will still exist as a
+  // zombie; it will be reaped by us on the next loop.
+  foreach (pid_t pid, promises.keys()) {
+    int status;
+    Result<pid_t> child_pid = os::waitpid(pid, &status, WNOHANG);
+    if (child_pid.isSome()) {
+      // We have reaped a child.
+      notify(pid, status);
+    } else if (!os::exists(pid)) {
+      // The process no longer exists and has been reaped by someone else.
+      notify(pid, None());
     }
-    promises.remove(pid);
   }
 
-private:
-  const Duration interval()
-  {
-    size_t count = promises.size();
+  delay(interval(), self(), &ReaperProcess::wait); // Reap forever!
+}
 
-    if (count <= LOW_PID_COUNT) {
-      return MIN_REAP_INTERVAL();
-    } else if (count >= HIGH_PID_COUNT) {
-      return MAX_REAP_INTERVAL();
+
+void ReaperProcess::notify(pid_t pid, Result<int> status)
+{
+  foreach (const Owned<Promise<Option<int>>>& promise, promises.get(pid)) {
+    if (status.isError()) {
+      promise->fail(status.error());
+    } else if (status.isNone()) {
+      promise->set(Option<int>::none());
+    } else {
+      promise->set(Option<int>(status.get()));
     }
+  }
+  promises.remove(pid);
+}
 
-    // Linear interpolation between min and max reap intervals.
-    double fraction =
-      ((double) (count - LOW_PID_COUNT) / (HIGH_PID_COUNT - LOW_PID_COUNT));
 
-    return (MIN_REAP_INTERVAL() +
-            (MAX_REAP_INTERVAL() - MIN_REAP_INTERVAL()) * fraction);
+const Duration ReaperProcess::interval()
+{
+  size_t count = promises.size();
+
+  if (count <= LOW_PID_COUNT) {
+    return MIN_REAP_INTERVAL();
+  } else if (count >= HIGH_PID_COUNT) {
+    return MAX_REAP_INTERVAL();
   }
 
-  multihashmap<pid_t, Owned<Promise<Option<int>>>> promises;
-};
+  // Linear interpolation between min and max reap intervals.
+  double fraction =
+    ((double) (count - LOW_PID_COUNT) / (HIGH_PID_COUNT - LOW_PID_COUNT));
 
+  return (MIN_REAP_INTERVAL() +
+          (MAX_REAP_INTERVAL() - MIN_REAP_INTERVAL()) * fraction);
+}
 
-// Global reaper object.
-static ReaperProcess* reaper = nullptr;
+} // namespace internal {
 
 
 Future<Option<int>> reap(pid_t pid)
 {
-  static Once* initialized = new Once();
-
-  if (!initialized->once()) {
-    reaper = new ReaperProcess();
-    spawn(reaper);
-    initialized->done();
-  }
-
-  CHECK_NOTNULL(reaper);
+  // The reaper process is instantiated in `process::initialize`.
+  process::initialize();
 
-  return dispatch(reaper, &ReaperProcess::reap, pid);
+  return dispatch(
+      internal::reaper,
+      &internal::ReaperProcess::reap,
+      pid);
 }
 
 } // namespace process {


[4/8] mesos git commit: Libprocess Reinit: Modified test to use PID.

Posted by jo...@apache.org.
Libprocess Reinit: Modified test to use PID<MetricsProcess>.

Updates the test's reference to the global metrics singleton.

Review: https://reviews.apache.org/r/40411/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/757727cb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/757727cb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/757727cb

Branch: refs/heads/master
Commit: 757727cba114adcf73901406396e911ce7282bff
Parents: e705d5f
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:37 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:18:50 2016 -0700

----------------------------------------------------------------------
 src/tests/scheduler_driver_tests.cpp | 4 +---
 src/tests/scheduler_tests.cpp        | 4 +---
 2 files changed, 2 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/757727cb/src/tests/scheduler_driver_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_driver_tests.cpp b/src/tests/scheduler_driver_tests.cpp
index faf2e6c..3705d13 100644
--- a/src/tests/scheduler_driver_tests.cpp
+++ b/src/tests/scheduler_driver_tests.cpp
@@ -62,8 +62,6 @@ using process::PID;
 
 using process::http::OK;
 
-using process::metrics::internal::MetricsProcess;
-
 using std::vector;
 
 using testing::_;
@@ -96,7 +94,7 @@ TEST_F(MesosSchedulerDriverTest, MetricsEndpoint)
   AWAIT_READY(registered);
 
   Future<process::http::Response> response =
-    process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    process::http::get(process::metrics::internal::metrics, "snapshot");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
   AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);

http://git-wip-us.apache.org/repos/asf/mesos/blob/757727cb/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index abd1f61..6e876a7 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -75,8 +75,6 @@ using process::Queue;
 
 using process::http::OK;
 
-using process::metrics::internal::MetricsProcess;
-
 using std::cout;
 using std::endl;
 using std::string;
@@ -356,7 +354,7 @@ TEST_P(SchedulerTest, MetricsEndpoint)
   AWAIT_READY(subscribed);
 
   Future<process::http::Response> response =
-    process::http::get(MetricsProcess::instance()->self(), "snapshot");
+    process::http::get(process::metrics::internal::metrics, "snapshot");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
   AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);


[8/8] mesos git commit: Libprocess Reinit: Added a test-only method to reinitialize libprocess.

Posted by jo...@apache.org.
Libprocess Reinit: Added a test-only method to reinitialize libprocess.

This builds upon earlier changes to complete `process::finalize`.
Tests which need to reconfigure libprocess, such as some SSL-related
tests, can use `process::reinitialize` to do so.

This method may also be useful for providing additional isolation
between tests, such as cleaning up all processes after each test case.

Review: https://reviews.apache.org/r/40512/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/16f479d1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/16f479d1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/16f479d1

Branch: refs/heads/master
Commit: 16f479d151d5a6554f8ebfcedfdc6b62dc7a0edb
Parents: 03c79ab
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:42 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:18:51 2016 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/openssl.cpp |  3 +++
 3rdparty/libprocess/src/process.cpp | 33 ++++++++++++++++++++++++++++++--
 2 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/16f479d1/3rdparty/libprocess/src/openssl.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/openssl.cpp b/3rdparty/libprocess/src/openssl.cpp
index 8e4e049..e6f17e4 100644
--- a/3rdparty/libprocess/src/openssl.cpp
+++ b/3rdparty/libprocess/src/openssl.cpp
@@ -286,6 +286,9 @@ string error_string(unsigned long code)
 // this function, it is not visible. This is the preferred behavior as
 // we do not want applications changing these settings while they are
 // running (this would be undefined behavior).
+// NOTE: This does not change the configuration of existing sockets, such
+// as the server socket spawned during libprocess initialization.
+// See `reinitialize` in `process.cpp`.
 void reinitialize()
 {
   // Wipe out and recreate the default flags.

http://git-wip-us.apache.org/repos/asf/mesos/blob/16f479d1/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index b6ef0f2..ab2b5a9 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -489,6 +489,11 @@ private:
 };
 
 
+// Synchronization primitives for `initialize`.
+// See documentation in `initialize` for how they are used.
+static std::atomic_bool initialize_started(false);
+static std::atomic_bool initialize_complete(false);
+
 // Server socket listen backlog.
 static const int LISTEN_BACKLOG = 500000;
 
@@ -865,6 +870,32 @@ void install(vector<Owned<FirewallRule>>&& rules)
 
 } // namespace firewall {
 
+
+// Tests can declare this function and use it to re-configure libprocess
+// programmatically. Without explicitly declaring this function, it
+// is not visible. This is the preferred behavior as we do not want
+// applications changing these settings while they are
+// running (this would be undefined behavior).
+// NOTE: If the test is changing SSL-related configuration, the SSL library
+// must be reinitialized first.  See `reinitialize` in `openssl.cpp`.
+void reinitialize(
+    const Option<string>& delegate,
+    const Option<string>& readwriteAuthenticationRealm,
+    const Option<string>& readonlyAuthenticationRealm)
+{
+  process::finalize();
+
+  // Reset the initialization synchronization primitives.
+  initialize_started.store(false);
+  initialize_complete.store(false);
+
+  process::initialize(
+      delegate,
+      readwriteAuthenticationRealm,
+      readonlyAuthenticationRealm);
+}
+
+
 bool initialize(
     const Option<string>& delegate,
     const Option<string>& readwriteAuthenticationRealm,
@@ -879,8 +910,6 @@ bool initialize(
   // frequently throughout the code base. Therefore we chose to use
   // atomics rather than `Once`, as the overhead of a mutex and
   // condition variable is excessive here.
-  static std::atomic_bool initialize_started(false);
-  static std::atomic_bool initialize_complete(false);
 
   // Try and do the initialization or wait for it to complete.
 


[5/8] mesos git commit: Libprocess Reinit: Moved MetricsProcess instantiation into process.cpp.

Posted by jo...@apache.org.
Libprocess Reinit: Moved MetricsProcess instantiation into process.cpp.

The metrics singleton must be unified with `process::initialize` so
that it also falls under the scope of reinitialization.  The singleton
must also not be guarded by `Once`.

Review: https://reviews.apache.org/r/40410/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e705d5fd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e705d5fd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e705d5fd

Branch: refs/heads/master
Commit: e705d5fd2e86d23dcfafc9cefcad85a184da5565
Parents: 0a64d7a
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:35 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:18:50 2016 -0700

----------------------------------------------------------------------
 .../include/process/metrics/metrics.hpp         |  29 ++---
 3rdparty/libprocess/src/metrics/metrics.cpp     | 107 +++++++------------
 3rdparty/libprocess/src/process.cpp             |  14 ++-
 3rdparty/libprocess/src/tests/system_tests.cpp  |   4 +-
 4 files changed, 69 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e705d5fd/3rdparty/libprocess/include/process/metrics/metrics.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/metrics.hpp b/3rdparty/libprocess/include/process/metrics/metrics.hpp
index 54487ab..d9c64b7 100644
--- a/3rdparty/libprocess/include/process/metrics/metrics.hpp
+++ b/3rdparty/libprocess/include/process/metrics/metrics.hpp
@@ -29,16 +29,12 @@
 
 namespace process {
 namespace metrics {
-
-// Initializes the metrics library.
-void initialize(const Option<std::string>& authenticationRealm = None());
-
 namespace internal {
 
 class MetricsProcess : public Process<MetricsProcess>
 {
 public:
-  static MetricsProcess* instance();
+  static MetricsProcess* create(const Option<std::string>& authenticationRealm);
 
   Future<Nothing> add(Owned<Metric> metric);
 
@@ -83,24 +79,27 @@ private:
   // Used to rate limit the snapshot endpoint.
   Option<Owned<RateLimiter>> limiter;
 
-  // Needed for access to the private constructor.
-  friend void process::metrics::initialize(
-      const Option<std::string>& authenticationRealm);
-
   // The authentication realm that metrics HTTP endpoints are installed into.
   const Option<std::string> authenticationRealm;
 };
 
+
+// Global metrics process. Defined in process.cpp.
+extern PID<MetricsProcess> metrics;
+
 }  // namespace internal {
 
 
 template <typename T>
 Future<Nothing> add(const T& metric)
 {
+  // The metrics process is instantiated in `process::initialize`.
+  process::initialize();
+
   // There is an explicit copy in this call to ensure we end up owning
   // the last copy of a Metric when we remove it.
   return dispatch(
-      internal::MetricsProcess::instance(),
+      internal::metrics,
       &internal::MetricsProcess::add,
       Owned<Metric>(new T(metric)));
 }
@@ -108,8 +107,11 @@ Future<Nothing> add(const T& metric)
 
 inline Future<Nothing> remove(const Metric& metric)
 {
+  // The metrics process is instantiated in `process::initialize`.
+  process::initialize();
+
   return dispatch(
-      internal::MetricsProcess::instance(),
+      internal::metrics,
       &internal::MetricsProcess::remove,
       metric.name());
 }
@@ -118,8 +120,11 @@ inline Future<Nothing> remove(const Metric& metric)
 inline Future<hashmap<std::string, double>> snapshot(
     const Option<Duration>& timeout)
 {
+  // The metrics process is instantiated in `process::initialize`.
+  process::initialize();
+
   return dispatch(
-      internal::MetricsProcess::instance(),
+      internal::metrics,
       &internal::MetricsProcess::snapshot,
       timeout);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e705d5fd/3rdparty/libprocess/src/metrics/metrics.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/metrics/metrics.cpp b/3rdparty/libprocess/src/metrics/metrics.cpp
index 4ab3ac7..29c8c1d 100644
--- a/3rdparty/libprocess/src/metrics/metrics.cpp
+++ b/3rdparty/libprocess/src/metrics/metrics.cpp
@@ -19,7 +19,6 @@
 #include <process/collect.hpp>
 #include <process/dispatch.hpp>
 #include <process/help.hpp>
-#include <process/once.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
 
@@ -39,85 +38,57 @@ using std::vector;
 
 namespace process {
 namespace metrics {
+namespace internal {
 
-
-static internal::MetricsProcess* metrics_process = nullptr;
-
-
-void initialize(const Option<string>& authenticationRealm)
+MetricsProcess* MetricsProcess::create(
+    const Option<string>& authenticationRealm)
 {
-  // To prevent a deadlock, we must ensure libprocess is
-  // initialized. Otherwise, libprocess will be implicitly
-  // initialized inside the 'once' block below, which in
-  // turns initializes metrics, and we arrive back here
-  // and deadlock by calling 'once()' without allowing
-  // 'done()' to ever be called.
-  process::initialize();
-
-  static Once* initialized = new Once();
-  if (!initialized->once()) {
-    Option<string> limit =
-      os::getenv("LIBPROCESS_METRICS_SNAPSHOT_ENDPOINT_RATE_LIMIT");
-
-    Option<Owned<RateLimiter>> limiter;
-
-    // By default, we apply a rate limit of 2 requests
-    // per second to the metrics snapshot endpoint in
-    // order to maintain backwards compatibility (before
-    // this was made configurable, we hard-coded a limit
-    // of 2 requests per second).
-    if (limit.isNone()) {
-      limiter = Owned<RateLimiter>(new RateLimiter(2, Seconds(1)));
-    } else if (limit->empty()) {
-      limiter = None();
-    } else {
-      // TODO(vinod): Move this parsing logic to flags
-      // once we have a 'Rate' abstraction in stout.
-      Option<Error> reason;
-      vector<string> tokens = strings::tokenize(limit.get(), "/");
-
-      if (tokens.size() == 2) {
-        Try<int> requests = numify<int>(tokens[0]);
-        Try<Duration> interval = Duration::parse(tokens[1]);
-
-        if (requests.isError()) {
-          reason = Error(
-              "Failed to parse the number of requests: " + requests.error());
-        } else if (interval.isError()) {
-          reason = Error(
-              "Failed to parse the interval: " + interval.error());
-        } else {
-          limiter = Owned<RateLimiter>(
-              new RateLimiter(requests.get(), interval.get()));
-        }
+  Option<string> limit =
+    os::getenv("LIBPROCESS_METRICS_SNAPSHOT_ENDPOINT_RATE_LIMIT");
+
+  Option<Owned<RateLimiter>> limiter;
+
+  // By default, we apply a rate limit of 2 requests
+  // per second to the metrics snapshot endpoint in
+  // order to maintain backwards compatibility (before
+  // this was made configurable, we hard-coded a limit
+  // of 2 requests per second).
+  if (limit.isNone()) {
+    limiter = Owned<RateLimiter>(new RateLimiter(2, Seconds(1)));
+  } else if (limit->empty()) {
+    limiter = None();
+  } else {
+    // TODO(vinod): Move this parsing logic to flags
+    // once we have a 'Rate' abstraction in stout.
+    Option<Error> reason;
+    vector<string> tokens = strings::tokenize(limit.get(), "/");
+
+    if (tokens.size() == 2) {
+      Try<int> requests = numify<int>(tokens[0]);
+      Try<Duration> interval = Duration::parse(tokens[1]);
+
+      if (requests.isError()) {
+        reason = Error(
+            "Failed to parse the number of requests: " + requests.error());
+      } else if (interval.isError()) {
+        reason = Error(
+            "Failed to parse the interval: " + interval.error());
+      } else {
+        limiter = Owned<RateLimiter>(
+            new RateLimiter(requests.get(), interval.get()));
       }
+    }
 
-      if (limiter.isNone()) {
+    if (limiter.isNone()) {
         EXIT(EXIT_FAILURE)
           << "Failed to parse LIBPROCESS_METRICS_SNAPSHOT_ENDPOINT_RATE_LIMIT "
           << "'" << limit.get() << "'"
           << " (format is <number of requests>/<interval duration>)"
           << (reason.isSome() ? ": " + reason.get().message : "");
-      }
     }
-
-    metrics_process =
-      new internal::MetricsProcess(limiter, authenticationRealm);
-    spawn(metrics_process);
-
-    initialized->done();
   }
-}
-
-
-namespace internal {
-
-
-MetricsProcess* MetricsProcess::instance()
-{
-  metrics::initialize();
 
-  return metrics_process;
+  return new MetricsProcess(limiter, authenticationRealm);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e705d5fd/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 2be8e84..8af056e 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -545,6 +545,14 @@ THREAD_LOCAL ProcessBase* __process__ = nullptr;
 // Per thread executor pointer.
 THREAD_LOCAL Executor* _executor_ = nullptr;
 
+namespace metrics {
+namespace internal {
+
+PID<metrics::internal::MetricsProcess> metrics;
+
+} // namespace internal {
+} // namespace metrics {
+
 
 namespace http {
 
@@ -1079,8 +1087,10 @@ bool initialize(
   // Create global help process.
   help = spawn(new Help(delegate), true);
 
-  // Initialize the global metrics process.
-  metrics::initialize(readonlyAuthenticationRealm);
+  // Create the global metrics process.
+  metrics::internal::metrics = spawn(
+      metrics::internal::MetricsProcess::create(readonlyAuthenticationRealm),
+      true);
 
   // Create the global logging process.
   _logging = spawn(new Logging(readwriteAuthenticationRealm), true);

http://git-wip-us.apache.org/repos/asf/mesos/blob/e705d5fd/3rdparty/libprocess/src/tests/system_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/system_tests.cpp b/3rdparty/libprocess/src/tests/system_tests.cpp
index 0f4d042..6beb973 100644
--- a/3rdparty/libprocess/src/tests/system_tests.cpp
+++ b/3rdparty/libprocess/src/tests/system_tests.cpp
@@ -29,8 +29,6 @@ namespace http = process::http;
 
 using process::Future;
 
-using process::metrics::internal::MetricsProcess;
-
 // MESOS-1433
 // This test is disabled as the Gauges that are used for these metrics
 // may return Failures. In this case we do not put the metric into the
@@ -41,7 +39,7 @@ using process::metrics::internal::MetricsProcess;
 TEST(SystemTest, DISABLED_Metrics)
 {
   Future<http::Response> response =
-    http::get(MetricsProcess::instance()->self(), "snapshot");
+    http::get(process::metrics::internal::metrics, "snapshot");
 
   AWAIT_READY(response);
 


[7/8] mesos git commit: Libprocess Reinit: Changed Socket::DEFAULT_KIND to a non-static value.

Posted by jo...@apache.org.
Libprocess Reinit: Changed Socket::DEFAULT_KIND to a non-static value.

This is needed for tests that utilize the test-only
`process::reinitialize` function.

Review: https://reviews.apache.org/r/40268/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/03c79ab3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/03c79ab3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/03c79ab3

Branch: refs/heads/master
Commit: 03c79ab35e2cbb2e319cb81197ecfff5e60268b8
Parents: 3e3aeb2
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Mon Oct 24 15:06:41 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 24 16:18:51 2016 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |  2 +-
 3rdparty/libprocess/src/socket.cpp             | 11 +++++------
 2 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/03c79ab3/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 67551a9..f798af7 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -77,7 +77,7 @@ public:
    *
    * @see process::network::Socket::Kind
    */
-  static const Kind& DEFAULT_KIND();
+  static Kind DEFAULT_KIND();
 
   /**
    * Returns the kind representing the underlying implementation

http://git-wip-us.apache.org/repos/asf/mesos/blob/03c79ab3/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 6089248..7f93168 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -102,16 +102,15 @@ Try<Socket> Socket::create(Kind kind, Option<int> s)
 }
 
 
-const Socket::Kind& Socket::DEFAULT_KIND()
+Socket::Kind Socket::DEFAULT_KIND()
 {
-  static const Kind DEFAULT =
+  // NOTE: Some tests may change the OpenSSL flags and reinitialize
+  // libprocess. In non-test code, the return value should be constant.
 #ifdef USE_SSL_SOCKET
-      network::openssl::flags().enabled ? Socket::SSL : Socket::POLL;
+      return network::openssl::flags().enabled ? Socket::SSL : Socket::POLL;
 #else
-      Socket::POLL;
+      return Socket::POLL;
 #endif
-
-  return DEFAULT;
 }